You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/23 10:54:00 UTC
[ignite] branch master updated: IGNITE-13720 Parallelism for
defragmentation added. - Fixes #8574.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5c2bcd3 IGNITE-13720 Parallelism for defragmentation added. - Fixes #8574.
5c2bcd3 is described below
commit 5c2bcd30ae907771b991bfd4a156e0a7fda6713c
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Dec 23 13:46:12 2020 +0300
IGNITE-13720 Parallelism for defragmentation added. - Fixes #8574.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../configuration/DataStorageConfiguration.java | 30 +++
.../GridCacheDatabaseSharedManager.java | 3 +-
.../CachePartitionDefragmentationManager.java | 245 +++++++++++++--------
.../persistence/defragmentation/PageStoreMap.java | 8 +-
.../processors/query/GridQueryIndexing.java | 6 +-
.../processors/query/DummyQueryIndexing.java | 4 +-
.../processors/query/h2/IgniteH2Indexing.java | 15 +-
.../defragmentation/IndexingDefragmentation.java | 184 ++++++++++------
.../IgnitePdsIndexingDefragmentationTest.java | 8 +-
9 files changed, 326 insertions(+), 177 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 2fe4824..6f5b338 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -179,6 +179,9 @@ public class DataStorageConfiguration implements Serializable {
/** Default wal compaction level. */
public static final int DFLT_WAL_COMPACTION_LEVEL = Deflater.BEST_SPEED;
+ /** Default defragmentation thread pool size. */
+ public static final int DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE = 4;
+
/** Default compression algorithm for WAL page snapshot records. */
public static final DiskPageCompression DFLT_WAL_PAGE_COMPRESSION = DiskPageCompression.DISABLED;
@@ -320,6 +323,9 @@ public class DataStorageConfiguration implements Serializable {
/** Encryption configuration. */
private EncryptionConfiguration encCfg = new EncryptionConfiguration();
+ /** Maximum number of partitions which can be defragmented at the same time. */
+ private int defragmentationThreadPoolSize = DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE;
+
/**
* Creates valid durable memory configuration with all default values.
*/
@@ -1173,6 +1179,30 @@ public class DataStorageConfiguration implements Serializable {
return dfltWarmUpCfg;
}
+ /**
+ * Sets maximum number of partitions which can be defragmented at the same time.
+ *
+ * @param defragmentationThreadPoolSize Maximum number of partitions which can be defragmented at the same time.
+ * Default is {@link DataStorageConfiguration#DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE}.
+ * @return {@code this} for chaining.
+ */
+ public DataStorageConfiguration setDefragmentationThreadPoolSize(int defragmentationThreadPoolSize) {
+ A.ensure(defragmentationThreadPoolSize > 1, "Defragmentation thread pool size must be greater or equal to 1.");
+
+ this.defragmentationThreadPoolSize = defragmentationThreadPoolSize;
+
+ return this;
+ }
+
+ /**
+ * Maximum number of partitions which can be defragmented at the same time.
+ *
+ * @return Thread pool size for defragmentation.
+ */
+ public int getDefragmentationThreadPoolSize() {
+ return defragmentationThreadPoolSize;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4429ec7..06e00f9 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -784,7 +784,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
(FilePageStoreManager)cctx.pageStore(),
checkpointManager,
lightCheckpointMgr,
- persistenceCfg.getPageSize()
+ persistenceCfg.getPageSize(),
+ persistenceCfg.getDefragmentationThreadPoolSize()
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index 48616b6..b1682ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -71,8 +72,10 @@ import org.apache.ignite.internal.processors.cache.tree.PendingRow;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.collection.IntHashMap;
import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.collection.IntRWHashMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -82,6 +85,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.jetbrains.annotations.Nullable;
import static java.util.Comparator.comparing;
import static java.util.stream.StreamSupport.stream;
@@ -158,6 +163,9 @@ public class CachePartitionDefragmentationManager {
/** */
private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();
+ /** Checkpoint runner thread pool. If null tasks are to be run in single thread */
+ @Nullable private volatile IgniteThreadPoolExecutor defragmentationThreadPool;
+
/**
* @param cacheNames Names of caches to be defragmented. Empty means "all".
* @param sharedCtx Cache shared context.
@@ -174,7 +182,8 @@ public class CachePartitionDefragmentationManager {
FilePageStoreManager filePageStoreMgr,
CheckpointManager nodeCheckpoint,
LightweightCheckpointManager defragmentationCheckpoint,
- int pageSize
+ int pageSize,
+ int defragmentationThreadPoolSize
) throws IgniteCheckedException {
cachesForDefragmentation = new HashSet<>(cacheNames);
@@ -190,6 +199,15 @@ public class CachePartitionDefragmentationManager {
partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
mappingDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);
+
+ defragmentationThreadPool = new IgniteThreadPoolExecutor(
+ "defragmentation-worker",
+ sharedCtx.igniteInstanceName(),
+ defragmentationThreadPoolSize,
+ defragmentationThreadPoolSize,
+ 30_000,
+ new LinkedBlockingQueue<Runnable>()
+ );
}
/** */
@@ -353,103 +371,24 @@ public class CachePartitionDefragmentationManager {
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
}
- IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
-
- for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
- checkCancellation();
-
- int partId = oldCacheDataStore.partId();
+ IntMap<LinkMap> linkMapByPart = new IntRWHashMap<>();
- PartitionContext partCtx = new PartitionContext(
- workDir,
- grpId,
- partId,
- partDataRegion,
- mappingDataRegion,
+ IgniteUtils.doInParallel(
+ defragmentationThreadPool,
+ oldCacheDataStores,
+ oldCacheDataStore -> defragmentOnePartition(
oldGrpCtx,
+ grpId,
+ workDir,
+ offheap,
+ pageStoreFactory,
+ cmpFut,
+ oldPageMem,
newGrpCtx,
- cacheDataStores.get(partId),
- pageStoreFactory
- );
-
- if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
- partCtx.createPageStore(
- () -> defragmentedPartMappingFile(workDir, partId).toPath(),
- partCtx.mappingPagesAllocated,
- partCtx.mappingPageMemory
- );
-
- linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
-
- continue;
- }
-
- partCtx.createPageStore(
- () -> defragmentedPartMappingFile(workDir, partId).toPath(),
- partCtx.mappingPagesAllocated,
- partCtx.mappingPageMemory
- );
-
- linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
-
- checkCancellation();
-
- partCtx.createPageStore(
- () -> defragmentedPartTmpFile(workDir, partId).toPath(),
- partCtx.partPagesAllocated,
- partCtx.partPageMemory
- );
-
- partCtx.createNewCacheDataStore(offheap);
-
- copyPartitionData(partCtx, treeIter);
-
- DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
-
- PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);
-
- status.onPartitionDefragmented(
- oldGrpCtx,
- oldPageStore.size(),
- pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
- );
-
- //TODO Move inside of defragmentSinglePartition.
- IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
- if (fut.error() != null)
- return;
-
- if (log.isDebugEnabled()) {
- log.debug(S.toString(
- "Partition defragmented",
- "grpId", grpId, false,
- "partId", partId, false,
- "oldPages", oldPageStore.pages(), false,
- "newPages", partCtx.partPagesAllocated.get() + 1, false,
- "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
- "pageSize", pageSize, false,
- "partFile", defragmentedPartFile(workDir, partId).getName(), false,
- "workDir", workDir, false
- ));
- }
-
- oldPageMem.invalidate(grpId, partId);
-
- partCtx.partPageMemory.invalidate(grpId, partId);
-
- pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
-
- renameTempPartitionFile(workDir, partId);
- };
-
- GridFutureAdapter<?> cpFut = defragmentationCheckpoint
- .forceCheckpoint("partition defragmented", null)
- .futureFor(CheckpointState.FINISHED);
-
- cpFut.listen(cpLsnr);
-
- cmpFut.add((IgniteInternalFuture<Object>)cpFut);
- }
+ linkMapByPart,
+ oldCacheDataStore
+ )
+ );
// A bit too general for now, but I like it more then saving only the last checkpoint future.
cmpFut.markInitialized().get();
@@ -551,6 +490,119 @@ public class CachePartitionDefragmentationManager {
}
}
+ /**
+ * Defragment one given partition.
+ */
+ private boolean defragmentOnePartition(
+ CacheGroupContext oldGrpCtx,
+ int grpId,
+ File workDir,
+ GridCacheOffheapManager offheap,
+ FilePageStoreFactory pageStoreFactory,
+ GridCompoundFuture<Object, Object> cmpFut,
+ PageMemoryEx oldPageMem,
+ CacheGroupContext newGrpCtx,
+ IntMap<LinkMap> linkMapByPart,
+ CacheDataStore oldCacheDataStore
+ ) throws IgniteCheckedException {
+ TreeIterator treeIter = new TreeIterator(pageSize);
+
+ checkCancellation();
+
+ int partId = oldCacheDataStore.partId();
+
+ PartitionContext partCtx = new PartitionContext(
+ workDir,
+ grpId,
+ partId,
+ partDataRegion,
+ mappingDataRegion,
+ oldGrpCtx,
+ newGrpCtx,
+ oldCacheDataStore,
+ pageStoreFactory
+ );
+
+ if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
+ partCtx.createPageStore(
+ () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+ partCtx.mappingPagesAllocated,
+ partCtx.mappingPageMemory
+ );
+
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+
+ return false;
+ }
+
+ partCtx.createPageStore(
+ () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+ partCtx.mappingPagesAllocated,
+ partCtx.mappingPageMemory
+ );
+
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+
+ checkCancellation();
+
+ partCtx.createPageStore(
+ () -> defragmentedPartTmpFile(workDir, partId).toPath(),
+ partCtx.partPagesAllocated,
+ partCtx.partPageMemory
+ );
+
+ partCtx.createNewCacheDataStore(offheap);
+
+ copyPartitionData(partCtx, treeIter);
+
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+
+ PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);
+
+ status.onPartitionDefragmented(
+ oldGrpCtx,
+ oldPageStore.size(),
+ pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
+ );
+
+ //TODO Move inside of defragmentSinglePartition.
+ IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
+ if (fut.error() == null) {
+ if (log.isDebugEnabled()) {
+ log.debug(S.toString(
+ "Partition defragmented",
+ "grpId", grpId, false,
+ "partId", partId, false,
+ "oldPages", oldPageStore.pages(), false,
+ "newPages", partCtx.partPagesAllocated.get() + 1, false,
+ "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
+ "pageSize", pageSize, false,
+ "partFile", defragmentedPartFile(workDir, partId).getName(), false,
+ "workDir", workDir, false
+ ));
+ }
+
+ oldPageMem.invalidate(grpId, partId);
+
+ partCtx.partPageMemory.invalidate(grpId, partId);
+
+ pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
+
+ renameTempPartitionFile(workDir, partId);
+ }
+ };
+
+ GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+ .forceCheckpoint("partition defragmented", null)
+ .futureFor(CheckpointState.FINISHED);
+
+ cpFut.listen(cpLsnr);
+
+ cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+
+ return true;
+ }
+
/** */
public IgniteInternalFuture<?> completionFuture() {
return completionFut.chain(future -> null);
@@ -839,7 +891,8 @@ public class CachePartitionDefragmentationManager {
(PageMemoryEx)partDataRegion.pageMemory(),
mappingByPartition,
cpLock,
- cancellationChecker
+ cancellationChecker,
+ defragmentationThreadPool
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
index 946fea1..3939c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
@@ -39,9 +39,11 @@ class PageStoreMap implements PageStoreCollection {
) {
IntMap<PageStore> pageStoresMap = grpPageStoresMap.get(grpId);
- //This code cannot be used concurrently. If we decide to parallel defragmentation then we should correct current class.
- if (pageStoresMap == null)
- grpPageStoresMap.put(grpId, pageStoresMap = new IntRWHashMap<>());
+ if (pageStoresMap == null) {
+ grpPageStoresMap.putIfAbsent(grpId, new IntRWHashMap<>());
+
+ pageStoresMap = grpPageStoresMap.get(grpId);
+ }
pageStoresMap.put(partId, pageStore);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 117fdeb..792b161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
/**
@@ -502,7 +503,7 @@ public interface GridQueryIndexing {
* @param mappingByPart Mapping page memory.
* @param cpLock Defragmentation checkpoint read lock.
* @param cancellationChecker Cancellation checker.
- *
+ * @param defragmentationThreadPool Thread pool for defragmentation.
* @throws IgniteCheckedException If failed.
*/
void defragment(
@@ -511,6 +512,7 @@ public interface GridQueryIndexing {
PageMemoryEx partPageMem,
IntMap<LinkMap> mappingByPart,
CheckpointTimeoutLock cpLock,
- Runnable cancellationChecker
+ Runnable cancellationChecker,
+ IgniteThreadPoolExecutor defragmentationThreadPool
) throws IgniteCheckedException;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index 732a48a..cfa16ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
/**
@@ -346,7 +347,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
PageMemoryEx partPageMem,
IntMap<LinkMap> mappingByPart,
CheckpointTimeoutLock cpLock,
- Runnable cancellationChecker
+ Runnable cancellationChecker,
+ IgniteThreadPoolExecutor defragmentationThreadPool
) throws IgniteCheckedException {
// No-op.
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 4a7513c..f443f6d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -189,6 +189,7 @@ import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.h2.api.ErrorCode;
import org.h2.api.JavaObjectSerializer;
import org.h2.engine.Session;
@@ -3210,8 +3211,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
PageMemoryEx partPageMem,
IntMap<LinkMap> mappingByPart,
CheckpointTimeoutLock cpLock,
- Runnable cancellationChecker
+ Runnable cancellationChecker,
+ IgniteThreadPoolExecutor defragmentationThreadPool
) throws IgniteCheckedException {
- defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, cancellationChecker, log);
+ defragmentation.defragment(
+ grpCtx,
+ newCtx,
+ partPageMem,
+ mappingByPart,
+ cpLock,
+ cancellationChecker,
+ log,
+ defragmentationThreadPool
+ );
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
index 050d381..d3a33e6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
@@ -55,7 +55,9 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.h2.index.Index;
import org.h2.value.Value;
@@ -81,6 +83,7 @@ public class IndexingDefragmentation {
* @param cpLock Defragmentation checkpoint read lock.
* @param cancellationChecker Cancellation checker.
* @param log Log.
+ * @param defragmentationThreadPool Thread pool for defragmentation.
*
* @throws IgniteCheckedException If failed.
*/
@@ -91,12 +94,11 @@ public class IndexingDefragmentation {
IntMap<LinkMap> mappingByPartition,
CheckpointTimeoutLock cpLock,
Runnable cancellationChecker,
- IgniteLogger log
+ IgniteLogger log,
+ IgniteThreadPoolExecutor defragmentationThreadPool
) throws IgniteCheckedException {
int pageSize = grpCtx.cacheObjectContext().kernalContext().grid().configuration().getDataStorageConfiguration().getPageSize();
- TreeIterator treeIterator = new TreeIterator(pageSize);
-
PageMemoryEx oldCachePageMem = (PageMemoryEx)grpCtx.dataRegion().pageMemory();
PageMemory newCachePageMemory = partPageMem;
@@ -105,106 +107,148 @@ public class IndexingDefragmentation {
long cpLockThreshold = 150L;
+ AtomicLong lastCpLockTs = new AtomicLong(System.currentTimeMillis());
+
+ IgniteUtils.doInParallel(
+ defragmentationThreadPool,
+ tables,
+ table -> defragmentTable(
+ grpCtx,
+ newCtx,
+ mappingByPartition,
+ cpLock,
+ cancellationChecker,
+ log,
+ pageSize,
+ oldCachePageMem,
+ newCachePageMemory,
+ cpLockThreshold,
+ lastCpLockTs,
+ table
+ )
+ );
+
+ if (log.isInfoEnabled())
+ log.info("Defragmentation indexes completed for group '" + grpCtx.groupId() + "'");
+ }
+
+ /**
+ * Defragment one given table.
+ */
+ private boolean defragmentTable(
+ CacheGroupContext grpCtx,
+ CacheGroupContext newCtx,
+ IntMap<LinkMap> mappingByPartition,
+ CheckpointTimeoutLock cpLock,
+ Runnable cancellationChecker,
+ IgniteLogger log,
+ int pageSize,
+ PageMemoryEx oldCachePageMem,
+ PageMemory newCachePageMemory,
+ long cpLockThreshold,
+ AtomicLong lastCpLockTs,
+ GridH2Table table
+ ) throws IgniteCheckedException {
cpLock.checkpointReadLock();
try {
- AtomicLong lastCpLockTs = new AtomicLong(System.currentTimeMillis());
+ TreeIterator treeIterator = new TreeIterator(pageSize);
- for (GridH2Table table : tables) {
- GridCacheContext<?, ?> cctx = table.cacheContext();
+ GridCacheContext<?, ?> cctx = table.cacheContext();
- if (cctx.groupId() != grpCtx.groupId())
- continue; // Not our index.
+ if (cctx.groupId() != grpCtx.groupId())
+ return false;
- cancellationChecker.run();
+ cancellationChecker.run();
- GridH2RowDescriptor rowDesc = table.rowDescriptor();
+ GridH2RowDescriptor rowDesc = table.rowDescriptor();
- List<Index> indexes = table.getIndexes();
- H2TreeIndex oldH2Idx = (H2TreeIndex)indexes.get(2);
+ List<Index> indexes = table.getIndexes();
+ H2TreeIndex oldH2Idx = (H2TreeIndex)indexes.get(2);
- int segments = oldH2Idx.segmentsCount();
+ int segments = oldH2Idx.segmentsCount();
- H2Tree firstTree = oldH2Idx.treeForRead(0);
+ H2Tree firstTree = oldH2Idx.treeForRead(0);
- PageIoResolver pageIoRslvr = pageAddr -> {
- PageIO io = PageIoResolver.DEFAULT_PAGE_IO_RESOLVER.resolve(pageAddr);
+ PageIoResolver pageIoRslvr = pageAddr -> {
+ PageIO io = PageIoResolver.DEFAULT_PAGE_IO_RESOLVER.resolve(pageAddr);
- if (io instanceof BPlusMetaIO)
- return io;
+ if (io instanceof BPlusMetaIO)
+ return io;
- //noinspection unchecked,rawtypes,rawtypes
- return wrap((BPlusIO)io);
- };
+ //noinspection unchecked,rawtypes,rawtypes
+ return wrap((BPlusIO)io);
+ };
- H2TreeIndex newIdx = H2TreeIndex.createIndex(
- cctx,
- null,
- table,
- oldH2Idx.getName(),
- firstTree.getPk(),
- firstTree.getAffinityKey(),
- Arrays.asList(firstTree.cols()),
- Arrays.asList(firstTree.cols()),
- oldH2Idx.inlineSize(),
- segments,
- newCachePageMemory,
- newCtx.offheap(),
- pageIoRslvr,
- log
- );
+ H2TreeIndex newIdx = H2TreeIndex.createIndex(
+ cctx,
+ null,
+ table,
+ oldH2Idx.getName(),
+ firstTree.getPk(),
+ firstTree.getAffinityKey(),
+ Arrays.asList(firstTree.cols()),
+ Arrays.asList(firstTree.cols()),
+ oldH2Idx.inlineSize(),
+ segments,
+ newCachePageMemory,
+ newCtx.offheap(),
+ pageIoRslvr,
+ log
+ );
- for (int i = 0; i < segments; i++) {
- H2Tree tree = oldH2Idx.treeForRead(i);
+ for (int i = 0; i < segments; i++) {
+ H2Tree tree = oldH2Idx.treeForRead(i);
- newIdx.treeForRead(i).enableSequentialWriteMode();
+ newIdx.treeForRead(i).enableSequentialWriteMode();
- treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> {
- cancellationChecker.run();
+ treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> {
+ cancellationChecker.run();
- if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
- cpLock.checkpointReadUnlock();
+ if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
+ cpLock.checkpointReadUnlock();
- cpLock.checkpointReadLock();
+ cpLock.checkpointReadLock();
- lastCpLockTs.set(System.currentTimeMillis());
- }
+ lastCpLockTs.set(System.currentTimeMillis());
+ }
- assert 1 == io.getVersion()
- : "IO version " + io.getVersion() + " is not supported by current defragmentation algorithm." +
- " Please implement copying of tree in a new format.";
+ assert 1 == io.getVersion()
+ : "IO version " + io.getVersion() + " is not supported by current defragmentation algorithm." +
+ " Please implement copying of tree in a new format.";
- BPlusIO<H2Row> h2IO = wrap(io);
+ BPlusIO<H2Row> h2IO = wrap(io);
- H2Row row = theTree.getRow(h2IO, pageAddr, idx);
+ H2Row row = theTree.getRow(h2IO, pageAddr, idx);
- if (row instanceof H2CacheRowWithIndex) {
- H2CacheRowWithIndex h2CacheRow = (H2CacheRowWithIndex)row;
+ if (row instanceof H2CacheRowWithIndex) {
+ H2CacheRowWithIndex h2CacheRow = (H2CacheRowWithIndex)row;
- CacheDataRow cacheDataRow = h2CacheRow.getRow();
+ CacheDataRow cacheDataRow = h2CacheRow.getRow();
- int partition = cacheDataRow.partition();
+ int partition = cacheDataRow.partition();
- long link = h2CacheRow.link();
+ long link = h2CacheRow.link();
- LinkMap map = mappingByPartition.get(partition);
+ LinkMap map = mappingByPartition.get(partition);
- long newLink = map.get(link);
+ long newLink = map.get(link);
- H2CacheRowWithIndex newRow = H2CacheRowWithIndex.create(
- rowDesc,
- newLink,
- h2CacheRow,
- ((H2RowLinkIO)io).storeMvccInfo()
- );
+ H2CacheRowWithIndex newRow = H2CacheRowWithIndex.create(
+ rowDesc,
+ newLink,
+ h2CacheRow,
+ ((H2RowLinkIO)io).storeMvccInfo()
+ );
- newIdx.putx(newRow);
- }
+ newIdx.putx(newRow);
+ }
- return true;
- });
- }
+ return true;
+ });
}
+
+ return true;
}
finally {
cpLock.checkpointReadUnlock();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index cf6b422..f768054 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.function.Function;
import org.apache.ignite.IgniteCache;
@@ -140,10 +141,13 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
- assertTrue(newIdxFileLen <= oldIdxFileLen);
+ assertTrue(
+ "newIdxFileLen=" + newIdxFileLen + ", oldIdxFileLen=" + oldIdxFileLen,
+ newIdxFileLen <= oldIdxFileLen
+ );
File completionMarkerFile = DefragmentationFileUtils.defragmentationCompletionMarkerFile(workDir);
- assertTrue(completionMarkerFile.exists());
+ assertTrue(Arrays.toString(workDir.listFiles()), completionMarkerFile.exists());
stopGrid(0);