You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/23 10:54:00 UTC

[ignite] branch master updated: IGNITE-13720 Parallelism for defragmentation added. - Fixes #8574.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c2bcd3  IGNITE-13720 Parallelism for defragmentation added. - Fixes #8574.
5c2bcd3 is described below

commit 5c2bcd30ae907771b991bfd4a156e0a7fda6713c
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Dec 23 13:46:12 2020 +0300

    IGNITE-13720 Parallelism for defragmentation added. - Fixes #8574.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../configuration/DataStorageConfiguration.java    |  30 +++
 .../GridCacheDatabaseSharedManager.java            |   3 +-
 .../CachePartitionDefragmentationManager.java      | 245 +++++++++++++--------
 .../persistence/defragmentation/PageStoreMap.java  |   8 +-
 .../processors/query/GridQueryIndexing.java        |   6 +-
 .../processors/query/DummyQueryIndexing.java       |   4 +-
 .../processors/query/h2/IgniteH2Indexing.java      |  15 +-
 .../defragmentation/IndexingDefragmentation.java   | 184 ++++++++++------
 .../IgnitePdsIndexingDefragmentationTest.java      |   8 +-
 9 files changed, 326 insertions(+), 177 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 2fe4824..6f5b338 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -179,6 +179,9 @@ public class DataStorageConfiguration implements Serializable {
     /** Default wal compaction level. */
     public static final int DFLT_WAL_COMPACTION_LEVEL = Deflater.BEST_SPEED;
 
+    /** Default defragmentation thread pool size. */
+    public static final int DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE = 4;
+
     /** Default compression algorithm for WAL page snapshot records. */
     public static final DiskPageCompression DFLT_WAL_PAGE_COMPRESSION = DiskPageCompression.DISABLED;
 
@@ -320,6 +323,9 @@ public class DataStorageConfiguration implements Serializable {
     /** Encryption configuration. */
     private EncryptionConfiguration encCfg = new EncryptionConfiguration();
 
+    /** Maximum number of partitions which can be defragmented at the same time. */
+    private int defragmentationThreadPoolSize = DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE;
+
     /**
      * Creates valid durable memory configuration with all default values.
      */
@@ -1173,6 +1179,30 @@ public class DataStorageConfiguration implements Serializable {
         return dfltWarmUpCfg;
     }
 
+    /**
+     * Sets maximum number of partitions which can be defragmented at the same time.
+     *
+     * @param defragmentationThreadPoolSize Maximum number of partitions which can be defragmented at the same time.
+     *      Default is {@link DataStorageConfiguration#DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE}.
+     * @return {@code this} for chaining.
+     */
+    public DataStorageConfiguration setDefragmentationThreadPoolSize(int defragmentationThreadPoolSize) {
+        A.ensure(defragmentationThreadPoolSize > 1, "Defragmentation thread pool size must be greater or equal to 1.");
+
+        this.defragmentationThreadPoolSize = defragmentationThreadPoolSize;
+
+        return this;
+    }
+
+    /**
+     * Maximum number of partitions which can be defragmented at the same time.
+     *
+     * @return Thread pool size for defragmentation.
+     */
+    public int getDefragmentationThreadPoolSize() {
+        return defragmentationThreadPoolSize;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataStorageConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4429ec7..06e00f9 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -784,7 +784,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             (FilePageStoreManager)cctx.pageStore(),
             checkpointManager,
             lightCheckpointMgr,
-            persistenceCfg.getPageSize()
+            persistenceCfg.getPageSize(),
+            persistenceCfg.getDefragmentationThreadPoolSize()
         );
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index 48616b6..b1682ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -71,8 +72,10 @@ import org.apache.ignite.internal.processors.cache.tree.PendingRow;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.collection.IntHashMap;
 import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.collection.IntRWHashMap;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -82,6 +85,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.jetbrains.annotations.Nullable;
 
 import static java.util.Comparator.comparing;
 import static java.util.stream.StreamSupport.stream;
@@ -158,6 +163,9 @@ public class CachePartitionDefragmentationManager {
     /** */
     private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();
 
+    /** Checkpoint runner thread pool. If null tasks are to be run in single thread */
+    @Nullable private volatile IgniteThreadPoolExecutor defragmentationThreadPool;
+
     /**
      * @param cacheNames Names of caches to be defragmented. Empty means "all".
      * @param sharedCtx Cache shared context.
@@ -174,7 +182,8 @@ public class CachePartitionDefragmentationManager {
         FilePageStoreManager filePageStoreMgr,
         CheckpointManager nodeCheckpoint,
         LightweightCheckpointManager defragmentationCheckpoint,
-        int pageSize
+        int pageSize,
+        int defragmentationThreadPoolSize
     ) throws IgniteCheckedException {
         cachesForDefragmentation = new HashSet<>(cacheNames);
 
@@ -190,6 +199,15 @@ public class CachePartitionDefragmentationManager {
 
         partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
         mappingDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);
+
+        defragmentationThreadPool = new IgniteThreadPoolExecutor(
+            "defragmentation-worker",
+            sharedCtx.igniteInstanceName(),
+            defragmentationThreadPoolSize,
+            defragmentationThreadPoolSize,
+            30_000,
+            new LinkedBlockingQueue<Runnable>()
+        );
     }
 
     /** */
@@ -353,103 +371,24 @@ public class CachePartitionDefragmentationManager {
                         defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
                     }
 
-                    IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
-
-                    for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
-                        checkCancellation();
-
-                        int partId = oldCacheDataStore.partId();
+                    IntMap<LinkMap> linkMapByPart = new IntRWHashMap<>();
 
-                        PartitionContext partCtx = new PartitionContext(
-                            workDir,
-                            grpId,
-                            partId,
-                            partDataRegion,
-                            mappingDataRegion,
+                    IgniteUtils.doInParallel(
+                        defragmentationThreadPool,
+                        oldCacheDataStores,
+                        oldCacheDataStore -> defragmentOnePartition(
                             oldGrpCtx,
+                            grpId,
+                            workDir,
+                            offheap,
+                            pageStoreFactory,
+                            cmpFut,
+                            oldPageMem,
                             newGrpCtx,
-                            cacheDataStores.get(partId),
-                            pageStoreFactory
-                        );
-
-                        if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
-                            partCtx.createPageStore(
-                                () -> defragmentedPartMappingFile(workDir, partId).toPath(),
-                                partCtx.mappingPagesAllocated,
-                                partCtx.mappingPageMemory
-                            );
-
-                            linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
-
-                            continue;
-                        }
-
-                        partCtx.createPageStore(
-                            () -> defragmentedPartMappingFile(workDir, partId).toPath(),
-                            partCtx.mappingPagesAllocated,
-                            partCtx.mappingPageMemory
-                        );
-
-                        linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
-
-                        checkCancellation();
-
-                        partCtx.createPageStore(
-                            () -> defragmentedPartTmpFile(workDir, partId).toPath(),
-                            partCtx.partPagesAllocated,
-                            partCtx.partPageMemory
-                        );
-
-                        partCtx.createNewCacheDataStore(offheap);
-
-                        copyPartitionData(partCtx, treeIter);
-
-                        DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
-
-                        PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);
-
-                        status.onPartitionDefragmented(
-                            oldGrpCtx,
-                            oldPageStore.size(),
-                            pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
-                        );
-
-                        //TODO Move inside of defragmentSinglePartition.
-                        IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
-                            if (fut.error() != null)
-                                return;
-
-                            if (log.isDebugEnabled()) {
-                                log.debug(S.toString(
-                                    "Partition defragmented",
-                                    "grpId", grpId, false,
-                                    "partId", partId, false,
-                                    "oldPages", oldPageStore.pages(), false,
-                                    "newPages", partCtx.partPagesAllocated.get() + 1, false,
-                                    "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
-                                    "pageSize", pageSize, false,
-                                    "partFile", defragmentedPartFile(workDir, partId).getName(), false,
-                                    "workDir", workDir, false
-                                ));
-                            }
-
-                            oldPageMem.invalidate(grpId, partId);
-
-                            partCtx.partPageMemory.invalidate(grpId, partId);
-
-                            pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
-
-                            renameTempPartitionFile(workDir, partId);
-                        };
-
-                        GridFutureAdapter<?> cpFut = defragmentationCheckpoint
-                            .forceCheckpoint("partition defragmented", null)
-                            .futureFor(CheckpointState.FINISHED);
-
-                        cpFut.listen(cpLsnr);
-
-                        cmpFut.add((IgniteInternalFuture<Object>)cpFut);
-                    }
+                            linkMapByPart,
+                            oldCacheDataStore
+                        )
+                    );
 
                     // A bit too general for now, but I like it more then saving only the last checkpoint future.
                     cmpFut.markInitialized().get();
@@ -551,6 +490,119 @@ public class CachePartitionDefragmentationManager {
         }
     }
 
+    /**
+     * Defragment one given partition.
+     */
+    private boolean defragmentOnePartition(
+        CacheGroupContext oldGrpCtx,
+        int grpId,
+        File workDir,
+        GridCacheOffheapManager offheap,
+        FilePageStoreFactory pageStoreFactory,
+        GridCompoundFuture<Object, Object> cmpFut,
+        PageMemoryEx oldPageMem,
+        CacheGroupContext newGrpCtx,
+        IntMap<LinkMap> linkMapByPart,
+        CacheDataStore oldCacheDataStore
+    ) throws IgniteCheckedException {
+        TreeIterator treeIter = new TreeIterator(pageSize);
+
+        checkCancellation();
+
+        int partId = oldCacheDataStore.partId();
+
+        PartitionContext partCtx = new PartitionContext(
+            workDir,
+            grpId,
+            partId,
+            partDataRegion,
+            mappingDataRegion,
+            oldGrpCtx,
+            newGrpCtx,
+            oldCacheDataStore,
+            pageStoreFactory
+        );
+
+        if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
+            partCtx.createPageStore(
+                () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+                partCtx.mappingPagesAllocated,
+                partCtx.mappingPageMemory
+            );
+
+            linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+
+            return false;
+        }
+
+        partCtx.createPageStore(
+            () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+            partCtx.mappingPagesAllocated,
+            partCtx.mappingPageMemory
+        );
+
+        linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+
+        checkCancellation();
+
+        partCtx.createPageStore(
+            () -> defragmentedPartTmpFile(workDir, partId).toPath(),
+            partCtx.partPagesAllocated,
+            partCtx.partPageMemory
+        );
+
+        partCtx.createNewCacheDataStore(offheap);
+
+        copyPartitionData(partCtx, treeIter);
+
+        DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+
+        PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);
+
+        status.onPartitionDefragmented(
+            oldGrpCtx,
+            oldPageStore.size(),
+            pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
+        );
+
+        //TODO Move inside of defragmentSinglePartition.
+        IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
+            if (fut.error() == null) {
+                if (log.isDebugEnabled()) {
+                    log.debug(S.toString(
+                        "Partition defragmented",
+                        "grpId", grpId, false,
+                        "partId", partId, false,
+                        "oldPages", oldPageStore.pages(), false,
+                        "newPages", partCtx.partPagesAllocated.get() + 1, false,
+                        "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
+                        "pageSize", pageSize, false,
+                        "partFile", defragmentedPartFile(workDir, partId).getName(), false,
+                        "workDir", workDir, false
+                    ));
+                }
+
+                oldPageMem.invalidate(grpId, partId);
+
+                partCtx.partPageMemory.invalidate(grpId, partId);
+
+                pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
+
+                renameTempPartitionFile(workDir, partId);
+            }
+        };
+
+        GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+            .forceCheckpoint("partition defragmented", null)
+            .futureFor(CheckpointState.FINISHED);
+
+        cpFut.listen(cpLsnr);
+
+        cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+
+        return true;
+    }
+
     /** */
     public IgniteInternalFuture<?> completionFuture() {
         return completionFut.chain(future -> null);
@@ -839,7 +891,8 @@ public class CachePartitionDefragmentationManager {
             (PageMemoryEx)partDataRegion.pageMemory(),
             mappingByPartition,
             cpLock,
-            cancellationChecker
+            cancellationChecker,
+            defragmentationThreadPool
         );
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
index 946fea1..3939c87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
@@ -39,9 +39,11 @@ class PageStoreMap implements PageStoreCollection {
     ) {
         IntMap<PageStore> pageStoresMap = grpPageStoresMap.get(grpId);
 
-        //This code cannot be used concurrently. If we decide to parallel defragmentation then we should correct current class.
-        if (pageStoresMap == null)
-            grpPageStoresMap.put(grpId, pageStoresMap = new IntRWHashMap<>());
+        if (pageStoresMap == null) {
+            grpPageStoresMap.putIfAbsent(grpId, new IntRWHashMap<>());
+
+            pageStoresMap = grpPageStoresMap.get(grpId);
+        }
 
         pageStoresMap.put(partId, pageStore);
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 117fdeb..792b161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -502,7 +503,7 @@ public interface GridQueryIndexing {
      * @param mappingByPart Mapping page memory.
      * @param cpLock Defragmentation checkpoint read lock.
      * @param cancellationChecker Cancellation checker.
-     *
+     * @param defragmentationThreadPool Thread pool for defragmentation.
      * @throws IgniteCheckedException If failed.
      */
     void defragment(
@@ -511,6 +512,7 @@ public interface GridQueryIndexing {
         PageMemoryEx partPageMem,
         IntMap<LinkMap> mappingByPart,
         CheckpointTimeoutLock cpLock,
-        Runnable cancellationChecker
+        Runnable cancellationChecker,
+        IgniteThreadPoolExecutor defragmentationThreadPool
     ) throws IgniteCheckedException;
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index 732a48a..cfa16ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -346,7 +347,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
         PageMemoryEx partPageMem,
         IntMap<LinkMap> mappingByPart,
         CheckpointTimeoutLock cpLock,
-        Runnable cancellationChecker
+        Runnable cancellationChecker,
+        IgniteThreadPoolExecutor defragmentationThreadPool
     ) throws IgniteCheckedException {
         // No-op.
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 4a7513c..f443f6d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -189,6 +189,7 @@ import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
 import org.h2.engine.Session;
@@ -3210,8 +3211,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         PageMemoryEx partPageMem,
         IntMap<LinkMap> mappingByPart,
         CheckpointTimeoutLock cpLock,
-        Runnable cancellationChecker
+        Runnable cancellationChecker,
+        IgniteThreadPoolExecutor defragmentationThreadPool
     ) throws IgniteCheckedException {
-        defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, cancellationChecker, log);
+        defragmentation.defragment(
+            grpCtx,
+            newCtx,
+            partPageMem,
+            mappingByPart,
+            cpLock,
+            cancellationChecker,
+            log,
+            defragmentationThreadPool
+        );
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
index 050d381..d3a33e6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
@@ -55,7 +55,9 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
 import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.h2.index.Index;
 import org.h2.value.Value;
 
@@ -81,6 +83,7 @@ public class IndexingDefragmentation {
      * @param cpLock Defragmentation checkpoint read lock.
      * @param cancellationChecker Cancellation checker.
      * @param log Log.
+     * @param defragmentationThreadPool Thread pool for defragmentation.
      *
      * @throws IgniteCheckedException If failed.
      */
@@ -91,12 +94,11 @@ public class IndexingDefragmentation {
         IntMap<LinkMap> mappingByPartition,
         CheckpointTimeoutLock cpLock,
         Runnable cancellationChecker,
-        IgniteLogger log
+        IgniteLogger log,
+        IgniteThreadPoolExecutor defragmentationThreadPool
     ) throws IgniteCheckedException {
         int pageSize = grpCtx.cacheObjectContext().kernalContext().grid().configuration().getDataStorageConfiguration().getPageSize();
 
-        TreeIterator treeIterator = new TreeIterator(pageSize);
-
         PageMemoryEx oldCachePageMem = (PageMemoryEx)grpCtx.dataRegion().pageMemory();
 
         PageMemory newCachePageMemory = partPageMem;
@@ -105,106 +107,148 @@ public class IndexingDefragmentation {
 
         long cpLockThreshold = 150L;
 
+        AtomicLong lastCpLockTs = new AtomicLong(System.currentTimeMillis());
+
+        IgniteUtils.doInParallel(
+            defragmentationThreadPool,
+            tables,
+            table -> defragmentTable(
+                grpCtx,
+                newCtx,
+                mappingByPartition,
+                cpLock,
+                cancellationChecker,
+                log,
+                pageSize,
+                oldCachePageMem,
+                newCachePageMemory,
+                cpLockThreshold,
+                lastCpLockTs,
+                table
+            )
+        );
+
+        if (log.isInfoEnabled())
+            log.info("Defragmentation indexes completed for group '" + grpCtx.groupId() + "'");
+    }
+
+    /**
+     * Defragment one given table.
+     */
+    private boolean defragmentTable(
+        CacheGroupContext grpCtx,
+        CacheGroupContext newCtx,
+        IntMap<LinkMap> mappingByPartition,
+        CheckpointTimeoutLock cpLock,
+        Runnable cancellationChecker,
+        IgniteLogger log,
+        int pageSize,
+        PageMemoryEx oldCachePageMem,
+        PageMemory newCachePageMemory,
+        long cpLockThreshold,
+        AtomicLong lastCpLockTs,
+        GridH2Table table
+    ) throws IgniteCheckedException {
         cpLock.checkpointReadLock();
 
         try {
-            AtomicLong lastCpLockTs = new AtomicLong(System.currentTimeMillis());
+            TreeIterator treeIterator = new TreeIterator(pageSize);
 
-            for (GridH2Table table : tables) {
-                GridCacheContext<?, ?> cctx = table.cacheContext();
+            GridCacheContext<?, ?> cctx = table.cacheContext();
 
-                if (cctx.groupId() != grpCtx.groupId())
-                    continue; // Not our index.
+            if (cctx.groupId() != grpCtx.groupId())
+                return false;
 
-                cancellationChecker.run();
+            cancellationChecker.run();
 
-                GridH2RowDescriptor rowDesc = table.rowDescriptor();
+            GridH2RowDescriptor rowDesc = table.rowDescriptor();
 
-                List<Index> indexes = table.getIndexes();
-                H2TreeIndex oldH2Idx = (H2TreeIndex)indexes.get(2);
+            List<Index> indexes = table.getIndexes();
+            H2TreeIndex oldH2Idx = (H2TreeIndex)indexes.get(2);
 
-                int segments = oldH2Idx.segmentsCount();
+            int segments = oldH2Idx.segmentsCount();
 
-                H2Tree firstTree = oldH2Idx.treeForRead(0);
+            H2Tree firstTree = oldH2Idx.treeForRead(0);
 
-                PageIoResolver pageIoRslvr = pageAddr -> {
-                    PageIO io = PageIoResolver.DEFAULT_PAGE_IO_RESOLVER.resolve(pageAddr);
+            PageIoResolver pageIoRslvr = pageAddr -> {
+                PageIO io = PageIoResolver.DEFAULT_PAGE_IO_RESOLVER.resolve(pageAddr);
 
-                    if (io instanceof BPlusMetaIO)
-                        return io;
+                if (io instanceof BPlusMetaIO)
+                    return io;
 
-                    //noinspection unchecked,rawtypes,rawtypes
-                    return wrap((BPlusIO)io);
-                };
+                //noinspection unchecked,rawtypes,rawtypes
+                return wrap((BPlusIO)io);
+            };
 
-                H2TreeIndex newIdx = H2TreeIndex.createIndex(
-                    cctx,
-                    null,
-                    table,
-                    oldH2Idx.getName(),
-                    firstTree.getPk(),
-                    firstTree.getAffinityKey(),
-                    Arrays.asList(firstTree.cols()),
-                    Arrays.asList(firstTree.cols()),
-                    oldH2Idx.inlineSize(),
-                    segments,
-                    newCachePageMemory,
-                    newCtx.offheap(),
-                    pageIoRslvr,
-                    log
-                );
+            H2TreeIndex newIdx = H2TreeIndex.createIndex(
+                cctx,
+                null,
+                table,
+                oldH2Idx.getName(),
+                firstTree.getPk(),
+                firstTree.getAffinityKey(),
+                Arrays.asList(firstTree.cols()),
+                Arrays.asList(firstTree.cols()),
+                oldH2Idx.inlineSize(),
+                segments,
+                newCachePageMemory,
+                newCtx.offheap(),
+                pageIoRslvr,
+                log
+            );
 
-                for (int i = 0; i < segments; i++) {
-                    H2Tree tree = oldH2Idx.treeForRead(i);
+            for (int i = 0; i < segments; i++) {
+                H2Tree tree = oldH2Idx.treeForRead(i);
 
-                    newIdx.treeForRead(i).enableSequentialWriteMode();
+                newIdx.treeForRead(i).enableSequentialWriteMode();
 
-                    treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> {
-                        cancellationChecker.run();
+                treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> {
+                    cancellationChecker.run();
 
-                        if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
-                            cpLock.checkpointReadUnlock();
+                    if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
+                        cpLock.checkpointReadUnlock();
 
-                            cpLock.checkpointReadLock();
+                        cpLock.checkpointReadLock();
 
-                            lastCpLockTs.set(System.currentTimeMillis());
-                        }
+                        lastCpLockTs.set(System.currentTimeMillis());
+                    }
 
-                        assert 1 == io.getVersion()
-                            : "IO version " + io.getVersion() + " is not supported by current defragmentation algorithm." +
-                            " Please implement copying of tree in a new format.";
+                    assert 1 == io.getVersion()
+                        : "IO version " + io.getVersion() + " is not supported by current defragmentation algorithm." +
+                        " Please implement copying of tree in a new format.";
 
-                        BPlusIO<H2Row> h2IO = wrap(io);
+                    BPlusIO<H2Row> h2IO = wrap(io);
 
-                        H2Row row = theTree.getRow(h2IO, pageAddr, idx);
+                    H2Row row = theTree.getRow(h2IO, pageAddr, idx);
 
-                        if (row instanceof H2CacheRowWithIndex) {
-                            H2CacheRowWithIndex h2CacheRow = (H2CacheRowWithIndex)row;
+                    if (row instanceof H2CacheRowWithIndex) {
+                        H2CacheRowWithIndex h2CacheRow = (H2CacheRowWithIndex)row;
 
-                            CacheDataRow cacheDataRow = h2CacheRow.getRow();
+                        CacheDataRow cacheDataRow = h2CacheRow.getRow();
 
-                            int partition = cacheDataRow.partition();
+                        int partition = cacheDataRow.partition();
 
-                            long link = h2CacheRow.link();
+                        long link = h2CacheRow.link();
 
-                            LinkMap map = mappingByPartition.get(partition);
+                        LinkMap map = mappingByPartition.get(partition);
 
-                            long newLink = map.get(link);
+                        long newLink = map.get(link);
 
-                            H2CacheRowWithIndex newRow = H2CacheRowWithIndex.create(
-                                rowDesc,
-                                newLink,
-                                h2CacheRow,
-                                ((H2RowLinkIO)io).storeMvccInfo()
-                            );
+                        H2CacheRowWithIndex newRow = H2CacheRowWithIndex.create(
+                            rowDesc,
+                            newLink,
+                            h2CacheRow,
+                            ((H2RowLinkIO)io).storeMvccInfo()
+                        );
 
-                            newIdx.putx(newRow);
-                        }
+                        newIdx.putx(newRow);
+                    }
 
-                        return true;
-                    });
-                }
+                    return true;
+                });
             }
+
+            return true;
         }
         finally {
             cpLock.checkpointReadUnlock();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index cf6b422..f768054 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.function.Function;
 import org.apache.ignite.IgniteCache;
@@ -140,10 +141,13 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
 
         long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
 
-        assertTrue(newIdxFileLen <= oldIdxFileLen);
+        assertTrue(
+            "newIdxFileLen=" + newIdxFileLen + ", oldIdxFileLen=" + oldIdxFileLen,
+            newIdxFileLen <= oldIdxFileLen
+        );
 
         File completionMarkerFile = DefragmentationFileUtils.defragmentationCompletionMarkerFile(workDir);
-        assertTrue(completionMarkerFile.exists());
+        assertTrue(Arrays.toString(workDir.listFiles()), completionMarkerFile.exists());
 
         stopGrid(0);