You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/09 07:06:03 UTC

[ignite] branch master updated: IGNITE-13709 Control.sh API - status command for defragmentation feature - Fixes #8548.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 493759a  IGNITE-13709 Control.sh API - status command for defragmentation feature - Fixes #8548.
493759a is described below

commit 493759a5aa43d2a026b56552cf26cd72e0ed96a9
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Dec 9 09:54:58 2020 +0300

    IGNITE-13709 Control.sh API - status command for defragmentation feature - Fixes #8548.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../commandline/DefragmentationCommand.java        |  13 +-
 .../GridCommandHandlerDefragmentationTest.java     | 141 ++++-
 .../CachePartitionDefragmentationManager.java      | 577 +++++++++++++++------
 3 files changed, 564 insertions(+), 167 deletions(-)

diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
index c2fa8e9..ec5c1f0 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
@@ -111,7 +111,7 @@ public class DefragmentationCommand implements Command<DefragmentationArguments>
     @Override public void parseArguments(CommandArgIterator argIter) {
         DefragmentationSubcommands cmd = DefragmentationSubcommands.of(argIter.nextArg("Expected defragmentation subcommand."));
 
-        if (cmd == null || cmd == DefragmentationSubcommands.STATUS) // Status subcommand is not yet completed.
+        if (cmd == null)
             throw new IllegalArgumentException("Expected correct defragmentation subcommand.");
 
         args = new DefragmentationArguments(cmd);
@@ -124,10 +124,17 @@ public class DefragmentationCommand implements Command<DefragmentationArguments>
                 String subarg;
 
                 do {
-                    subarg = argIter.nextArg("Expected one of subcommand arguments.").toLowerCase(Locale.ENGLISH);
+                    subarg = argIter.peekNextArg();
+
+                    if (subarg == null)
+                        break;
+
+                    subarg = subarg.toLowerCase(Locale.ENGLISH);
 
                     switch (subarg) {
                         case NODES_ARG: {
+                            argIter.nextArg("");
+
                             Set<String> ids = argIter.nextStringSet(NODES_ARG);
 
                             if (ids.isEmpty())
@@ -139,6 +146,8 @@ public class DefragmentationCommand implements Command<DefragmentationArguments>
                         }
 
                         case CACHES_ARG: {
+                            argIter.nextArg("");
+
                             Set<String> ids = argIter.nextStringSet(CACHES_ARG);
 
                             if (ids.isEmpty())
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
index adce9e2..c2ea3c2 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.util;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.UnaryOperator;
 import java.util.logging.Formatter;
 import java.util.logging.LogRecord;
 import java.util.logging.Logger;
 import java.util.logging.StreamHandler;
+import java.util.regex.Pattern;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cluster.ClusterState;
@@ -33,6 +36,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.commandline.CommandHandler;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.maintenance.MaintenanceTask;
@@ -50,6 +54,9 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu
     /** */
     private static CountDownLatch blockCdl;
 
+    /** */
+    private static CountDownLatch waitCdl;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -205,7 +212,9 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu
         assertTrue(logLsnr.check());
     }
 
-    /** */
+    /**
+     * @throws Exception If failed.
+     */
     @Test
     public void testDefragmentationCancelInProgress() throws Exception {
         IgniteEx ig = startGrid(0);
@@ -311,6 +320,136 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu
         assertTrue(logLsnr.check());
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDefragmentationStatus() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().state(ClusterState.ACTIVE);
+
+        ig.getOrCreateCache(DEFAULT_CACHE_NAME + "1");
+
+        IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME + "2");
+
+        ig.getOrCreateCache(DEFAULT_CACHE_NAME + "3");
+
+        for (int i = 0; i < 1024; i++)
+            cache.put(i, i);
+
+        forceCheckpoint(ig);
+
+        String grid0ConsId = ig.configuration().getConsistentId().toString();
+
+        ListeningTestLogger testLog = new ListeningTestLogger();
+
+        CommandHandler cmd = createCommandHandler(testLog);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--defragmentation",
+            "schedule",
+            "--nodes",
+            grid0ConsId
+        ));
+
+        String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString();
+
+        stopGrid(0);
+
+        blockCdl = new CountDownLatch(128);
+        waitCdl = new CountDownLatch(1);
+
+        UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
+            DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+
+            FileIOFactory delegate = dsCfg.getFileIOFactory();
+
+            dsCfg.setFileIOFactory((file, modes) -> {
+                if (file.getName().contains("dfrg")) {
+                    if (blockCdl.getCount() == 0) {
+                        try {
+                            waitCdl.await();
+                        }
+                        catch (InterruptedException ignore) {
+                            // No-op.
+                        }
+                    }
+                    else
+                        blockCdl.countDown();
+                }
+
+                return delegate.create(file, modes);
+            });
+
+            return cfg;
+        };
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+            try {
+                startGrid(0, cfgOp);
+            }
+            catch (Exception e) {
+                // No-op.
+                throw new RuntimeException(e);
+            }
+        });
+
+        blockCdl.await();
+
+        List<LogListener> logLsnrs = Arrays.asList(
+            LogListener.matches("default1 - size before/after: 0MB/0MB").build(),
+            LogListener.matches("default2 - partitions processed/all:").build(),
+            LogListener.matches("Awaiting defragmentation: default3").build()
+        );
+
+        for (LogListener logLsnr : logLsnrs)
+            testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--port",
+            port,
+            "--defragmentation",
+            "status"
+        ));
+
+        waitCdl.countDown();
+
+        for (LogListener logLsnr : logLsnrs)
+            assertTrue(logLsnr.check());
+
+        fut.get();
+
+        ((GridCacheDatabaseSharedManager)grid(0).context().cache().context().database())
+            .defragmentationManager()
+            .completionFuture()
+            .get();
+
+        testLog.clearListeners();
+
+        logLsnrs = Arrays.asList(
+            LogListener.matches("default1 - size before/after: 0MB/0MB").build(),
+            LogListener.matches(Pattern.compile("default2 - size before/after: (\\S+)/\\1")).build(),
+            LogListener.matches("default3 - size before/after: 0MB/0MB").build()
+        );
+
+        for (LogListener logLsnr : logLsnrs)
+            testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--port",
+            port,
+            "--defragmentation",
+            "status"
+        ));
+
+        for (LogListener logLsnr : logLsnrs)
+            assertTrue(logLsnr.check());
+    }
+
     /** */
     private CommandHandler createCommandHandler(ListeningTestLogger testLog) {
         Logger log = CommandHandler.initLogger(null);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index 41999fb..75b3458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -19,11 +19,16 @@ package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
 
 import java.io.File;
 import java.nio.file.Path;
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingRow;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.collection.IntHashMap;
 import org.apache.ignite.internal.util.collection.IntMap;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -78,9 +84,11 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.maintenance.MaintenanceRegistry;
 
+import static java.util.Comparator.comparing;
 import static java.util.stream.StreamSupport.stream;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
 import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
@@ -105,6 +113,9 @@ public class CachePartitionDefragmentationManager {
     /** */
     private final Set<String> cachesForDefragmentation;
 
+    /** */
+    private final Set<CacheGroupContext> cacheGrpCtxsForDefragmentation = new TreeSet<>(comparing(CacheGroupContext::cacheOrGroupName));
+
     /** Cache shared context. */
     private final GridCacheSharedContext<?, ?> sharedCtx;
 
@@ -142,6 +153,9 @@ public class CachePartitionDefragmentationManager {
     private final AtomicBoolean cancel = new AtomicBoolean();
 
     /** */
+    private final DefragmentationStatus status = new DefragmentationStatus();
+
+    /** */
     private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();
 
     /**
@@ -190,11 +204,23 @@ public class CachePartitionDefragmentationManager {
         dbMgr.preserveWalTailPointer();
 
         sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+
+        for (CacheGroupContext oldGrpCtx : sharedCtx.cache().cacheGroups()) {
+            if (!oldGrpCtx.userCache() || cacheGrpCtxsForDefragmentation.contains(oldGrpCtx))
+                continue;
+
+            if (!cachesForDefragmentation.isEmpty()) {
+                if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name())))
+                    continue;
+            }
+
+            cacheGrpCtxsForDefragmentation.add(oldGrpCtx);
+        }
     }
 
     /** */
     public void executeDefragmentation() throws IgniteCheckedException {
-        log.info("Defragmentation started.");
+        status.onStart(cacheGrpCtxsForDefragmentation);
 
         try {
             // Now the actual process starts.
@@ -203,23 +229,18 @@ public class CachePartitionDefragmentationManager {
             IgniteInternalFuture<?> idxDfrgFut = null;
             DataPageEvictionMode prevPageEvictionMode = null;
 
-            for (CacheGroupContext oldGrpCtx : sharedCtx.cache().cacheGroups()) {
-                if (!oldGrpCtx.userCache())
-                    continue;
-
+            for (CacheGroupContext oldGrpCtx : cacheGrpCtxsForDefragmentation) {
                 int grpId = oldGrpCtx.groupId();
 
-                if (!cachesForDefragmentation.isEmpty()) {
-                    if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name())))
-                        continue;
-                }
-
                 File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName());
 
-                try {
-                    if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
-                        continue;
+                if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) {
+                    status.onCacheGroupSkipped(oldGrpCtx);
 
+                    continue;
+                }
+
+                try {
                     GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
 
                     List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
@@ -233,226 +254,243 @@ public class CachePartitionDefragmentationManager {
                         })
                         .collect(Collectors.toList());
 
-                    if (workDir != null && !oldCacheDataStores.isEmpty()) {
-                        // We can't start defragmentation of new group on the region that has wrong eviction mode.
-                        // So waiting of the previous cache group defragmentation is inevitable.
-                        DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
+                    status.onCacheGroupStart(oldGrpCtx, oldCacheDataStores.size());
 
-                        if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
-                            prevPageEvictionMode = curPageEvictionMode;
+                    if (workDir == null || oldCacheDataStores.isEmpty()) {
+                        status.onCacheGroupFinish(oldGrpCtx);
 
-                            partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+                        continue;
+                    }
 
-                            if (idxDfrgFut != null)
-                                idxDfrgFut.get();
-                        }
+                    // We can't start defragmentation of new group on the region that has wrong eviction mode.
+                    // So waiting of the previous cache group defragmentation is inevitable.
+                    DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
 
-                        IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
+                    if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
+                        prevPageEvictionMode = curPageEvictionMode;
 
-                        for (CacheDataStore store : offheap.cacheDataStores()) {
-                            // Tree can be null for not yet initialized partitions.
-                            // This would mean that these partitions are empty.
-                            assert store.tree() == null || store.tree().groupId() == grpId;
+                        partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
 
-                            if (store.tree() != null)
-                                cacheDataStores.put(store.partId(), store);
-                        }
+                        if (idxDfrgFut != null)
+                            idxDfrgFut.get();
+                    }
 
-                        dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+                    IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
 
-                        // Another cheat. Ttl cleanup manager knows too much shit.
-                        oldGrpCtx.caches().stream()
-                            .filter(cacheCtx -> cacheCtx.groupId() == grpId)
-                            .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+                    for (CacheDataStore store : offheap.cacheDataStores()) {
+                        // Tree can be null for not yet initialized partitions.
+                        // This would mean that these partitions are empty.
+                        assert store.tree() == null || store.tree().groupId() == grpId;
 
-                        // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care
-                        // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
-                        // cache group explicitly.
-                        oldGrpCtx.localWalEnabled(false, false);
+                        if (store.tree() != null)
+                            cacheDataStores.put(store.partId(), store);
+                    }
 
-                        boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
+                    dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
 
-                        FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+                    // Another cheat. Ttl cleanup manager knows too much shit.
+                    oldGrpCtx.caches().stream()
+                        .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+                        .forEach(cacheCtx -> cacheCtx.ttl().unregister());
 
-                        createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> {
-                        }); //TODO Allocated tracker.
+                    // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care
+                    // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
+                    // cache group explicitly.
+                    oldGrpCtx.localWalEnabled(false, false);
 
-                        checkCancellation();
+                    boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
 
-                        GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
+                    FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
 
-                        PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
+                    AtomicLong idxAllocationTracker = new GridAtomicLong();
 
-                        CacheGroupContext newGrpCtx = new CacheGroupContext(
-                            sharedCtx,
-                            grpId,
-                            oldGrpCtx.receivedFrom(),
-                            CacheType.USER,
-                            oldGrpCtx.config(),
-                            oldGrpCtx.affinityNode(),
-                            partDataRegion,
-                            oldGrpCtx.cacheObjectContext(),
-                            null,
-                            null,
-                            oldGrpCtx.localStartVersion(),
-                            true,
-                            false,
-                            true
-                        );
+                    createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, idxAllocationTracker::addAndGet);
 
-                        defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+                    checkCancellation();
 
-                        try {
-                            // This will initialize partition meta in index partition - meta tree and reuse list.
-                            newGrpCtx.start();
-                        }
-                        finally {
-                            defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
-                        }
+                    GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
 
-                        IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
+                    PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
 
-                        for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
-                            checkCancellation();
+                    CacheGroupContext newGrpCtx = new CacheGroupContext(
+                        sharedCtx,
+                        grpId,
+                        oldGrpCtx.receivedFrom(),
+                        CacheType.USER,
+                        oldGrpCtx.config(),
+                        oldGrpCtx.affinityNode(),
+                        partDataRegion,
+                        oldGrpCtx.cacheObjectContext(),
+                        null,
+                        null,
+                        oldGrpCtx.localStartVersion(),
+                        true,
+                        false,
+                        true
+                    );
 
-                            int partId = oldCacheDataStore.partId();
+                    defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
 
-                            PartitionContext partCtx = new PartitionContext(
-                                workDir,
-                                grpId,
-                                partId,
-                                partDataRegion,
-                                mappingDataRegion,
-                                oldGrpCtx,
-                                newGrpCtx,
-                                cacheDataStores.get(partId),
-                                pageStoreFactory
-                            );
+                    try {
+                        // This will initialize partition meta in index partition - meta tree and reuse list.
+                        newGrpCtx.start();
+                    }
+                    finally {
+                        defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+                    }
 
-                            if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
-                                partCtx.createPageStore(
-                                    () -> defragmentedPartMappingFile(workDir, partId).toPath(),
-                                    partCtx.mappingPagesAllocated,
-                                    partCtx.mappingPageMemory
-                                );
+                    IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
 
-                                linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+                    for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
+                        checkCancellation();
 
-                                continue;
-                            }
+                        int partId = oldCacheDataStore.partId();
+
+                        PartitionContext partCtx = new PartitionContext(
+                            workDir,
+                            grpId,
+                            partId,
+                            partDataRegion,
+                            mappingDataRegion,
+                            oldGrpCtx,
+                            newGrpCtx,
+                            cacheDataStores.get(partId),
+                            pageStoreFactory
+                        );
 
+                        if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
                             partCtx.createPageStore(
                                 () -> defragmentedPartMappingFile(workDir, partId).toPath(),
                                 partCtx.mappingPagesAllocated,
                                 partCtx.mappingPageMemory
                             );
 
-                            linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+                            linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
 
-                            checkCancellation();
+                            continue;
+                        }
 
-                            partCtx.createPageStore(
-                                () -> defragmentedPartTmpFile(workDir, partId).toPath(),
-                                partCtx.partPagesAllocated,
-                                partCtx.partPageMemory
-                            );
+                        partCtx.createPageStore(
+                            () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+                            partCtx.mappingPagesAllocated,
+                            partCtx.mappingPageMemory
+                        );
+
+                        linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+
+                        checkCancellation();
+
+                        partCtx.createPageStore(
+                            () -> defragmentedPartTmpFile(workDir, partId).toPath(),
+                            partCtx.partPagesAllocated,
+                            partCtx.partPageMemory
+                        );
 
                             partCtx.createNewCacheDataStore(offheap);
 
                             copyPartitionData(partCtx, treeIter);
 
-                            IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
-                                if (fut.error() != null)
-                                    return;
+                        DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
 
-                                PageStore oldPageStore = null;
+                        PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);
 
-                                try {
-                                    oldPageStore = filePageStoreMgr.getStore(grpId, partId);
-                                }
-                                catch (IgniteCheckedException ignore) {
-                                }
+                        status.onPartitionDefragmented(
+                            oldGrpCtx,
+                            oldPageStore.size(),
+                            pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
+                        );
 
-                                assert oldPageStore != null;
+                        //TODO Move inside of defragmentSinglePartition.
+                        IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
+                            if (fut.error() != null)
+                                return;
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(S.toString(
+                                    "Partition defragmented",
+                                    "grpId", grpId, false,
+                                    "partId", partId, false,
+                                    "oldPages", oldPageStore.pages(), false,
+                                    "newPages", partCtx.partPagesAllocated.get() + 1, false,
+                                    "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
+                                    "pageSize", pageSize, false,
+                                    "partFile", defragmentedPartFile(workDir, partId).getName(), false,
+                                    "workDir", workDir, false
+                                ));
+                            }
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug(S.toString(
-                                        "Partition defragmented",
-                                        "grpId", grpId, false,
-                                        "partId", partId, false,
-                                        "oldPages", oldPageStore.pages(), false,
-                                        "newPages", partCtx.partPagesAllocated.get() + 1, false,
-                                        "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
-                                        "pageSize", pageSize, false,
-                                        "partFile", defragmentedPartFile(workDir, partId).getName(), false,
-                                        "workDir", workDir, false
-                                    ));
-                                }
+                            oldPageMem.invalidate(grpId, partId);
 
-                                oldPageMem.invalidate(grpId, partId);
+                            partCtx.partPageMemory.invalidate(grpId, partId);
 
-                                partCtx.partPageMemory.invalidate(grpId, partId);
+                            pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
 
-                                DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+                            renameTempPartitionFile(workDir, partId);
+                        };
 
-                                pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
+                        GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+                            .forceCheckpoint("partition defragmented", null)
+                            .futureFor(CheckpointState.FINISHED);
 
-                                renameTempPartitionFile(workDir, partId);
-                            };
+                        cpFut.listen(cpLsnr);
 
-                            GridFutureAdapter<?> cpFut = defragmentationCheckpoint
-                                .forceCheckpoint("partition defragmented", null)
-                                .futureFor(CheckpointState.FINISHED);
+                        cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+                    }
 
-                            cpFut.listen(cpLsnr);
+                    // A bit too general for now, but I like it more then saving only the last checkpoint future.
+                    cmpFut.markInitialized().get();
 
-                            cmpFut.add((IgniteInternalFuture<Object>)cpFut);
-                        }
+                    idxDfrgFut = new GridFinishedFuture<>();
 
-                        // A bit too general for now, but I like it more then saving only the last checkpoint future.
-                        cmpFut.markInitialized().get();
+                    if (filePageStoreMgr.hasIndexStore(grpId)) {
+                        defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
 
-                        idxDfrgFut = new GridFinishedFuture<>();
+                        idxDfrgFut = defragmentationCheckpoint
+                            .forceCheckpoint("index defragmented", null)
+                            .futureFor(CheckpointState.FINISHED);
+                    }
 
-                        if (filePageStoreMgr.hasIndexStore(grpId)) {
-                            defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
+                    idxDfrgFut = idxDfrgFut.chain(fut -> {
+                        oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
 
-                            idxDfrgFut = defragmentationCheckpoint
-                                .forceCheckpoint("index defragmented", null)
-                                .futureFor(CheckpointState.FINISHED);
-                        }
+                        PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
 
-                        idxDfrgFut = idxDfrgFut.chain(fut -> {
-                            oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+                        partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
 
-                            PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
+                        DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
 
-                            partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+                        pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
 
-                            DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
+                        PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
 
-                            pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
+                        pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
 
-                            PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
+                        pageMgr.pageStoreMap().clear(grpId);
 
-                            pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
+                        renameTempIndexFile(workDir);
 
-                            pageMgr.pageStoreMap().clear(grpId);
+                        writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
 
-                            renameTempIndexFile(workDir);
+                        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
 
-                            writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
+                        return null;
+                    });
 
-                            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
-                            return null;
-                        });
-                    }
+                    PageStore oldIdxPageStore = filePageStoreMgr.getStore(grpId, INDEX_PARTITION);
+
+                    status.onIndexDefragmented(
+                        oldGrpCtx,
+                        oldIdxPageStore.size(),
+                        pageSize + idxAllocationTracker.get() * pageSize // + file header.
+                    );
                 }
                 catch (DefragmentationCancelledException e) {
                     DefragmentationFileUtils.deleteLeftovers(workDir);
 
                     throw e;
                 }
+
+                status.onCacheGroupFinish(oldGrpCtx);
             }
 
             if (idxDfrgFut != null)
@@ -460,18 +498,24 @@ public class CachePartitionDefragmentationManager {
 
             mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
 
-            log.info("Defragmentation completed. All partitions are defragmented.");
+            status.onFinish();
 
             completionFut.onDone();
         }
         catch (DefragmentationCancelledException e) {
             mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
 
-            log.info("Defragmentation has been cancelled.");
+            log.info("Defragmentation process has been cancelled.");
+
+            status.onFinish();
 
             completionFut.onDone();
         }
         catch (Throwable t) {
+            log.error("Defragmentation process failed.", t);
+
+            status.onFinish();
+
             completionFut.onDone(t);
 
             throw t;
@@ -553,7 +597,7 @@ public class CachePartitionDefragmentationManager {
 
     /** */
     public String status() {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return status.toString();
     }
 
     /**
@@ -918,4 +962,209 @@ public class CachePartitionDefragmentationManager {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
     }
+
+    /** */
+    private class DefragmentationStatus {
+        /** */
+        private long startTs;
+
+        /** */
+        private long finishTs;
+
+        /** */
+        private final Set<String> scheduledGroups = new TreeSet<>();
+
+        /** */
+        private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> progressGroups
+            = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+
+        /** */
+        private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> finishedGroups
+            = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+
+        /** */
+        private final Set<String> skippedGroups = new TreeSet<>();
+
+        /** */
+        public synchronized void onStart(Set<CacheGroupContext> scheduledGroups) {
+            startTs = System.currentTimeMillis();
+
+            for (CacheGroupContext grp : scheduledGroups) {
+                this.scheduledGroups.add(grp.cacheOrGroupName());
+            }
+
+            log.info("Defragmentation started.");
+        }
+
+        /** */
+        public synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) {
+            scheduledGroups.remove(grpCtx.cacheOrGroupName());
+
+            progressGroups.put(grpCtx, new DefragmentationCacheGroupProgress(parts));
+        }
+
+        /** */
+        public synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
+            progressGroups.get(grpCtx).onPartitionDefragmented(oldSize, newSize);
+        }
+
+        /** */
+        public synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
+            progressGroups.get(grpCtx).onIndexDefragmented(oldSize, newSize);
+        }
+
+        /** */
+        public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) {
+            DefragmentationCacheGroupProgress progress = progressGroups.remove(grpCtx);
+
+            progress.onFinish();
+
+            finishedGroups.put(grpCtx, progress);
+        }
+
+        /** */
+        public synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx) {
+            scheduledGroups.remove(grpCtx.cacheOrGroupName());
+
+            skippedGroups.add(grpCtx.cacheOrGroupName());
+        }
+
+        /** */
+        public synchronized void onFinish() {
+            finishTs = System.currentTimeMillis();
+
+            progressGroups.clear();
+
+            scheduledGroups.clear();
+
+            log.info("Defragmentation process completed. Time: " + (finishTs - startTs) * 1e-3 + "s.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized String toString() {
+            StringBuilder sb = new StringBuilder();
+
+            if (!finishedGroups.isEmpty()) {
+                sb.append("Defragmentation is completed for cache groups:\n");
+
+                for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : finishedGroups.entrySet()) {
+                    sb.append("    ").append(entry.getKey().cacheOrGroupName()).append(" - ");
+
+                    sb.append(entry.getValue().toString()).append('\n');
+                }
+            }
+
+            if (!progressGroups.isEmpty()) {
+                sb.append("Defragmentation is in progress for cache groups:\n");
+
+                for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : progressGroups.entrySet()) {
+                    sb.append("    ").append(entry.getKey().cacheOrGroupName()).append(" - ");
+
+                    sb.append(entry.getValue().toString()).append('\n');
+                }
+            }
+
+            if (!skippedGroups.isEmpty())
+                sb.append("Skipped cache groups: ").append(String.join(", ", skippedGroups)).append('\n');
+
+            if (!scheduledGroups.isEmpty())
+                sb.append("Awaiting defragmentation: ").append(String.join(", ", scheduledGroups)).append('\n');
+
+            return sb.toString();
+        }
+    }
+
+    /** */
+    private static class DefragmentationCacheGroupProgress {
+        /** */
+        private static final DecimalFormat MB_FORMAT = new DecimalFormat(
+            "#.##",
+            DecimalFormatSymbols.getInstance(Locale.US)
+        );
+
+        /** */
+        private final int partsTotal;
+
+        /** */
+        private int partsCompleted;
+
+        /** */
+        private long oldSize;
+
+        /** */
+        private long newSize;
+
+        /** */
+        private final long startTs;
+
+        /** */
+        private long finishTs;
+
+        /** */
+        public DefragmentationCacheGroupProgress(int parts) {
+            partsTotal = parts;
+
+            startTs = System.currentTimeMillis();
+        }
+
+        /**
+         * @param oldSize Old partition size.
+         * @param newSize New partition size.
+         */
+        public void onPartitionDefragmented(long oldSize, long newSize) {
+            ++partsCompleted;
+
+            this.oldSize += oldSize;
+            this.newSize += newSize;
+        }
+
+        /**
+         * @param oldSize Old partition size.
+         * @param newSize New partition size.
+         */
+        public void onIndexDefragmented(long oldSize, long newSize) {
+            this.oldSize += oldSize;
+            this.newSize += newSize;
+        }
+
+        /** */
+        public void onFinish() {
+            finishTs = System.currentTimeMillis();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            StringBuilder sb = new StringBuilder();
+
+            if (finishTs == 0) {
+                sb.append("partitions processed/all: ").append(partsCompleted).append("/").append(partsTotal);
+
+                sb.append(", time elapsed: ");
+
+                appendDuration(sb, System.currentTimeMillis());
+            }
+            else {
+                double mb = 1024 * 1024;
+
+                sb.append("size before/after: ").append(MB_FORMAT.format(oldSize / mb)).append("MB/");
+                sb.append(MB_FORMAT.format(newSize / mb)).append("MB");
+
+                sb.append(", time took: ");
+
+                appendDuration(sb, finishTs);
+            }
+
+            return sb.toString();
+        }
+
+        /** */
+        private void appendDuration(StringBuilder sb, long end) {
+            long duration = Math.round((end - startTs) * 1e-3);
+
+            long mins = duration / 60;
+            long secs = duration % 60;
+
+            sb.append(mins).append(" mins ").append(secs).append(" secs");
+        }
+    }
 }