You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/07 07:50:39 UTC

[ignite] branch master updated: IGNITE-13697 Schedule and cancel control utility commands for defragmentation feature - Fixes #8449.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0de72a9  IGNITE-13697 Schedule and cancel control utility commands for defragmentation feature - Fixes #8449.
0de72a9 is described below

commit 0de72a908eac64d274aafdb59dd2f5598d0c470e
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Dec 7 10:48:07 2020 +0300

    IGNITE-13697 Schedule and cancel control utility commands for defragmentation feature - Fixes #8449.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../ignite/internal/commandline/CommandList.java   |   5 +-
 .../commandline/DefragmentationCommand.java        | 241 +++++++++++
 .../defragmentation/DefragmentationArguments.java  |  63 +++
 .../DefragmentationSubcommands.java                |  68 ++++
 .../commandline/CommandHandlerParsingTest.java     |   3 +-
 .../testsuites/IgniteControlUtilityTestSuite.java  |   3 +
 .../GridCommandHandlerDefragmentationTest.java     | 331 +++++++++++++++
 .../internal/maintenance/MaintenanceProcessor.java |  27 +-
 .../GridCacheDatabaseSharedManager.java            |  28 +-
 .../CachePartitionDefragmentationManager.java      | 444 +++++++++++++--------
 .../defragmentation/DefragmentationFileUtils.java  |  29 +-
 .../maintenance/DefragmentationParameters.java     |  34 +-
 .../DefragmentationWorkflowCallback.java           |  13 +-
 .../maintenance/ExecuteDefragmentationAction.java  |  33 +-
 ...nAction.java => StopDefragmentationAction.java} |  39 +-
 .../processors/query/GridQueryIndexing.java        |   4 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |  16 +-
 .../VisorDefragmentationOperation.java             |  28 ++
 .../defragmentation/VisorDefragmentationTask.java  | 211 ++++++++++
 .../VisorDefragmentationTaskArg.java               |  91 +++++
 .../VisorDefragmentationTaskResult.java            |  72 ++++
 .../ignite/maintenance/MaintenanceRegistry.java    |   3 +-
 .../main/resources/META-INF/classnames.properties  |   5 +
 .../cache/WalModeChangeAdvancedSelfTest.java       |  13 +-
 .../persistence/IgnitePdsDefragmentationTest.java  | 108 +++--
 .../processors/query/DummyQueryIndexing.java       |   3 +-
 ...ridCommandHandlerClusterByClassTest_help.output |   9 +
 ...andHandlerClusterByClassWithSSLTest_help.output |   9 +
 .../processors/query/h2/IgniteH2Indexing.java      |   5 +-
 .../defragmentation/IndexingDefragmentation.java   |   6 +
 .../IgnitePdsIndexingDefragmentationTest.java      |   2 +
 31 files changed, 1659 insertions(+), 287 deletions(-)

diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
index e16acaa..e9f8cd3 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
@@ -92,7 +92,10 @@ public enum CommandList {
     METRIC("--metric", new MetricCommand()),
 
     /** */
-    PERSISTENCE("--persistence", new PersistenceCommand());
+    PERSISTENCE("--persistence", new PersistenceCommand()),
+
+    /** Command to manage PDS defragmentation. */
+    DEFRAGMENTATION("--defragmentation", new DefragmentationCommand());
 
     /** Private values copy so there's no need in cloning it every time. */
     private static final CommandList[] VALUES = CommandList.values();
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
new file mode 100644
index 0000000..c2fa8e9
--- /dev/null
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientNode;
+import org.apache.ignite.internal.commandline.defragmentation.DefragmentationArguments;
+import org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskArg;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskResult;
+
+import static org.apache.ignite.internal.commandline.Command.usage;
+import static org.apache.ignite.internal.commandline.CommandList.DEFRAGMENTATION;
+import static org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands.CANCEL;
+import static org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands.SCHEDULE;
+
+/** */
+public class DefragmentationCommand implements Command<DefragmentationArguments> {
+    /** */
+    private static final String NODES_ARG = "--nodes";
+
+    /** */
+    private static final String CACHES_ARG = "--caches";
+
+    /** */
+    private DefragmentationArguments args;
+
+    /** {@inheritDoc} */
+    @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception {
+        try (GridClient client = Command.startClient(clientCfg)) {
+            Optional<GridClientNode> firstNodeOpt = client.compute().nodes().stream().filter(GridClientNode::connectable).findFirst();
+
+            if (firstNodeOpt.isPresent()) {
+                VisorDefragmentationTaskResult res;
+
+                if (args.nodeIds() == null) {
+                    res = TaskExecutor.executeTaskByNameOnNode(
+                        client,
+                        VisorDefragmentationTask.class.getName(),
+                        convertArguments(),
+                        null, // Use node from clientCfg.
+                        clientCfg
+                    );
+                }
+                else {
+                    VisorTaskArgument<?> visorArg = new VisorTaskArgument<>(
+                        client.compute().nodes().stream().filter(
+                            node -> args.nodeIds().contains(node.consistentId().toString())
+                        ).map(GridClientNode::nodeId).collect(Collectors.toList()),
+                        convertArguments(),
+                        false
+                    );
+
+                    res = client.compute()
+                        .projection(firstNodeOpt.get())
+                        .execute(
+                            VisorDefragmentationTask.class.getName(),
+                            visorArg
+                        );
+                }
+
+                printResult(res, log);
+            }
+            else
+                log.warning("No nodes found in topology, command won't be executed.");
+        }
+        catch (Throwable t) {
+            log.severe("Failed to execute defragmentation command='" + args.subcommand().text() + "'");
+            log.severe(CommandLogger.errorMessage(t));
+
+            throw t;
+        }
+
+        return null;
+    }
+
+    /** */
+    private void printResult(VisorDefragmentationTaskResult res, Logger log) {
+        assert res != null;
+
+        log.info(res.getMessage());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void parseArguments(CommandArgIterator argIter) {
+        DefragmentationSubcommands cmd = DefragmentationSubcommands.of(argIter.nextArg("Expected defragmentation subcommand."));
+
+        if (cmd == null || cmd == DefragmentationSubcommands.STATUS) // Status subcommand is not yet completed.
+            throw new IllegalArgumentException("Expected correct defragmentation subcommand.");
+
+        args = new DefragmentationArguments(cmd);
+
+        switch (cmd) {
+            case SCHEDULE:
+                List<String> consistentIds = null;
+                List<String> cacheNames = null;
+
+                String subarg;
+
+                do {
+                    subarg = argIter.nextArg("Expected one of subcommand arguments.").toLowerCase(Locale.ENGLISH);
+
+                    switch (subarg) {
+                        case NODES_ARG: {
+                            Set<String> ids = argIter.nextStringSet(NODES_ARG);
+
+                            if (ids.isEmpty())
+                                throw new IllegalArgumentException("Consistent ids list is empty.");
+
+                            consistentIds = new ArrayList<>(ids);
+
+                            break;
+                        }
+
+                        case CACHES_ARG: {
+                            Set<String> ids = argIter.nextStringSet(CACHES_ARG);
+
+                            if (ids.isEmpty())
+                                throw new IllegalArgumentException("Caches list is empty.");
+
+                            cacheNames = new ArrayList<>(ids);
+
+                            break;
+                        }
+
+                        default:
+                            subarg = null;
+                    }
+                }
+                while (subarg != null);
+
+                if (consistentIds == null)
+                    throw new IllegalArgumentException("--nodes argument is missing.");
+
+                args.setNodeIds(consistentIds);
+                args.setCacheNames(cacheNames);
+
+                break;
+
+            case STATUS:
+            case CANCEL:
+                // No-op.
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DefragmentationArguments arg() {
+        return args;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printUsage(Logger log) {
+        String consistentIds = "consistentId0,consistentId1";
+
+        String cacheNames = "cache1,cache2,cache3";
+
+        usage(
+            log,
+            "Schedule PDS defragmentation on given nodes for all caches:",
+            DEFRAGMENTATION,
+            SCHEDULE.text(),
+            NODES_ARG,
+            consistentIds
+        );
+
+        usage(
+            log,
+            "Schedule PDS defragmentation on given nodes but only for given caches:",
+            DEFRAGMENTATION,
+            SCHEDULE.text(),
+            NODES_ARG,
+            consistentIds,
+            CACHES_ARG,
+            cacheNames
+        );
+
+        usage(
+            log,
+            "Cancel scheduled or active PDS defragmentation on underlying node:",
+            DEFRAGMENTATION,
+            CANCEL.text()
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return DEFRAGMENTATION.toCommandName();
+    }
+
+    /** */
+    private VisorDefragmentationTaskArg convertArguments() {
+        return new VisorDefragmentationTaskArg(
+            convertSubcommand(args.subcommand()),
+            args.nodeIds(),
+            args.cacheNames()
+        );
+    }
+
+    /** */
+    private static VisorDefragmentationOperation convertSubcommand(DefragmentationSubcommands subcmd) {
+        switch (subcmd) {
+            case SCHEDULE:
+                return VisorDefragmentationOperation.SCHEDULE;
+
+            case STATUS:
+                return VisorDefragmentationOperation.STATUS;
+
+            case CANCEL:
+                return VisorDefragmentationOperation.CANCEL;
+
+            default:
+                throw new IllegalArgumentException(subcmd.name());
+        }
+    }
+}
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java
new file mode 100644
index 0000000..e82e578
--- /dev/null
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.defragmentation;
+
+import java.util.List;
+
+/** */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class DefragmentationArguments {
+    /** */
+    private final DefragmentationSubcommands subcmd;
+
+    /** */
+    private List<String> nodeIds;
+
+    /** */
+    private List<String> cacheNames;
+
+    /** */
+    public DefragmentationArguments(DefragmentationSubcommands subcmd) {
+        this.subcmd = subcmd;
+    }
+
+    /** */
+    public DefragmentationSubcommands subcommand() {
+        return subcmd;
+    }
+
+    /** */
+    public void setNodeIds(List<String> nodeIds) {
+        this.nodeIds = nodeIds;
+    }
+
+    /** */
+    public List<String> nodeIds() {
+        return nodeIds;
+    }
+
+    /** */
+    public void setCacheNames(List<String> cacheNames) {
+        this.cacheNames = cacheNames;
+    }
+
+    /** */
+    public List<String> cacheNames() {
+        return cacheNames;
+    }
+}
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java
new file mode 100644
index 0000000..86ec775
--- /dev/null
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.defragmentation;
+
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public enum DefragmentationSubcommands {
+    /** */
+    SCHEDULE("schedule", VisorDefragmentationOperation.SCHEDULE),
+
+    /** */
+    STATUS("status", VisorDefragmentationOperation.STATUS),
+
+    /** */
+    CANCEL("cancel", VisorDefragmentationOperation.CANCEL);
+
+    /** */
+    private final String name;
+
+    /** */
+    private final VisorDefragmentationOperation visorOperation;
+
+    /** */
+    DefragmentationSubcommands(String name, VisorDefragmentationOperation visorOperation) {
+        this.name = name;
+        this.visorOperation = visorOperation;
+    }
+
+    /**
+     * @param strRep String representation of subcommand.
+     * @return Subcommand for its string representation.
+     */
+    public static @Nullable DefragmentationSubcommands of(String strRep) {
+        for (DefragmentationSubcommands cmd : values()) {
+            if (cmd.text().equalsIgnoreCase(strRep))
+                return cmd;
+        }
+
+        return null;
+    }
+
+    /** */
+    public String text() {
+        return name;
+    }
+
+    /** */
+    public VisorDefragmentationOperation operation() {
+        return visorOperation;
+    }
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
index b3e51cd..b1ab5f8 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
@@ -1033,6 +1033,7 @@ public class CommandHandlerParsingTest {
             cmd == CommandList.WARM_UP ||
             cmd == CommandList.PROPERTY ||
             cmd == CommandList.SYSTEM_VIEW ||
-            cmd == CommandList.METRIC;
+            cmd == CommandList.METRIC ||
+            cmd == CommandList.DEFRAGMENTATION;
     }
 }
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index dbccfa5..cac12d4 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.util.GridCommandHandlerBrokenIndexTest;
 import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
+import org.apache.ignite.util.GridCommandHandlerDefragmentationTest;
 import org.apache.ignite.util.GridCommandHandlerIndexForceRebuildTest;
 import org.apache.ignite.util.GridCommandHandlerIndexListTest;
 import org.apache.ignite.util.GridCommandHandlerIndexRebuildStatusTest;
@@ -82,6 +83,8 @@ import org.junit.runners.Suite;
 
     GridCommandHandlerPropertiesTest.class,
 
+    GridCommandHandlerDefragmentationTest.class,
+
     SystemViewCommandTest.class,
     MetricCommandTest.class
 })
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
new file mode 100644
index 0000000..adce9e2
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.UnaryOperator;
+import java.util.logging.Formatter;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.logging.StreamHandler;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.commandline.CommandHandler;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.maintenance.MaintenanceTask;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+
+/** */
+public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClusterPerMethodAbstractTest {
+    /** */
+    private static CountDownLatch blockCdl;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getDataStorageConfiguration().setWalSegmentSize(512 * 1024).setWalSegments(3);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDefragmentationSchedule() throws Exception {
+        Ignite ignite = startGrids(2);
+
+        ignite.cluster().state(ACTIVE);
+
+        assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute("--defragmentation", "schedule"));
+
+        String grid0ConsId = grid(0).configuration().getConsistentId().toString();
+        String grid1ConsId = grid(1).configuration().getConsistentId().toString();
+
+        ListeningTestLogger testLog = new ListeningTestLogger();
+
+        CommandHandler cmd = createCommandHandler(testLog);
+
+        LogListener logLsnr = LogListener.matches("Scheduling completed successfully.").build();
+
+        testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--defragmentation",
+            "schedule",
+            "--nodes",
+            grid0ConsId
+        ));
+
+        assertTrue(logLsnr.check());
+
+        MaintenanceTask mntcTask = DefragmentationParameters.toStore(Collections.emptyList());
+
+        assertNotNull(grid(0).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
+        assertNull(grid(1).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
+
+        stopGrid(0);
+        startGrid(0);
+
+        logLsnr = LogListener.matches("Node is already in Maintenance Mode").build();
+
+        testLog.clearListeners();
+
+        testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--defragmentation",
+            "schedule",
+            "--nodes",
+            grid0ConsId
+        ));
+
+        assertTrue(logLsnr.check());
+
+        stopGrid(0);
+        startGrid(0);
+
+        stopGrid(1);
+        startGrid(1);
+
+        stopAllGrids();
+
+        startGrids(2);
+
+        logLsnr = LogListener.matches("Scheduling completed successfully.").times(2).build();
+
+        testLog.clearListeners();
+
+        testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--defragmentation",
+            "schedule",
+            "--nodes",
+            String.join(",", grid0ConsId, grid1ConsId)
+        ));
+
+        assertTrue(logLsnr.check());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDefragmentationCancel() throws Exception {
+        Ignite ignite = startGrids(2);
+
+        ignite.cluster().state(ACTIVE);
+
+        String grid0ConsId = grid(0).configuration().getConsistentId().toString();
+
+        ListeningTestLogger testLog = new ListeningTestLogger();
+
+        CommandHandler cmd = createCommandHandler(testLog);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--defragmentation",
+            "schedule",
+            "--nodes",
+            grid0ConsId
+        ));
+
+        LogListener logLsnr = LogListener.matches("Scheduled defragmentation task cancelled successfully.").atLeast(1).build();
+
+        testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--port",
+            grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(),
+            "--defragmentation",
+            "cancel"
+        ));
+
+        assertTrue(logLsnr.check());
+
+        testLog.clearListeners();
+
+        logLsnr = LogListener.matches("Scheduled defragmentation task is not found.").build();
+
+        testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--port",
+            grid(1).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(),
+            "--defragmentation",
+            "cancel"
+        ));
+
+        assertTrue(logLsnr.check());
+    }
+
+    /** */
+    @Test
+    public void testDefragmentationCancelInProgress() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 1024; i++)
+            cache.put(i, i);
+
+        forceCheckpoint(ig);
+
+        String grid0ConsId = ig.configuration().getConsistentId().toString();
+
+        ListeningTestLogger testLog = new ListeningTestLogger();
+
+        CommandHandler cmd = createCommandHandler(testLog);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--defragmentation",
+            "schedule",
+            "--nodes",
+            grid0ConsId
+        ));
+
+        String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString();
+
+        stopGrid(0);
+
+        blockCdl = new CountDownLatch(128);
+
+        UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
+            DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+
+            FileIOFactory delegate = dsCfg.getFileIOFactory();
+
+            dsCfg.setFileIOFactory((file, modes) -> {
+                if (file.getName().contains("dfrg")) {
+                    if (blockCdl.getCount() == 0) {
+                        try {
+                            // Slow down defragmentation process.
+                            // This'll be enough for the test since we have, like, 900 partitions left.
+                            Thread.sleep(100);
+                        }
+                        catch (InterruptedException ignore) {
+                            // No-op.
+                        }
+                    }
+                    else
+                        blockCdl.countDown();
+                }
+
+                return delegate.create(file, modes);
+            });
+
+            return cfg;
+        };
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+            try {
+                startGrid(0, cfgOp);
+            }
+            catch (Exception e) {
+                // No-op.
+                throw new RuntimeException(e);
+            }
+        });
+
+        blockCdl.await();
+
+        LogListener logLsnr = LogListener.matches("Defragmentation cancelled successfully.").build();
+
+        testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--port",
+            port,
+            "--defragmentation",
+            "cancel"
+        ));
+
+        assertTrue(logLsnr.check());
+
+        fut.get();
+
+        testLog.clearListeners();
+
+        logLsnr = LogListener.matches("Defragmentation is already completed or has been cancelled previously.").build();
+
+        testLog.registerListener(logLsnr);
+
+        assertEquals(EXIT_CODE_OK, execute(
+            cmd,
+            "--port",
+            port,
+            "--defragmentation",
+            "cancel"
+        ));
+
+        assertTrue(logLsnr.check());
+    }
+
+    /** */
+    private CommandHandler createCommandHandler(ListeningTestLogger testLog) {
+        Logger log = CommandHandler.initLogger(null);
+
+        log.addHandler(new StreamHandler(System.out, new Formatter() {
+            /** {@inheritDoc} */
+            @Override public String format(LogRecord record) {
+                String msg = record.getMessage();
+
+                testLog.info(msg);
+
+                return msg + "\n";
+            }
+        }));
+
+        return new CommandHandler(log);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
index 063bd47..5328d13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
@@ -61,6 +61,9 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
     /** */
     private final boolean inMemoryMode;
 
+    /** */
+    private volatile boolean maintenanceMode;
+
     /**
      * @param ctx Kernal context.
      */
@@ -99,7 +102,7 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
             log.info(
                 "Maintenance Task with name " + task.name() +
                     " is already registered" +
-                    oldTask.parameters() != null ? " with parameters " + oldTask.parameters() : "" + "." +
+                    (oldTask.parameters() != null ? " with parameters " + oldTask.parameters() : ".") +
                     " It will be replaced with new task" +
                     task.parameters() != null ? " with parameters " + task.parameters() : "" + "."
             );
@@ -134,6 +137,8 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
             fileStorage.init();
 
             activeTasks.putAll(fileStorage.getAllTasks());
+
+            maintenanceMode = !activeTasks.isEmpty();
         }
         catch (Throwable t) {
             log.warning("Caught exception when starting MaintenanceProcessor," +
@@ -213,18 +218,20 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
 
     /** {@inheritDoc} */
     @Override public boolean isMaintenanceMode() {
-        return !activeTasks.isEmpty();
+        return maintenanceMode;
     }
 
     /** {@inheritDoc} */
-    @Override public void unregisterMaintenanceTask(String maintenanceTaskName) {
+    @Override public boolean unregisterMaintenanceTask(String maintenanceTaskName) {
         if (inMemoryMode)
-            return;
+            return false;
+
+        boolean deleted;
 
         if (isMaintenanceMode())
-            activeTasks.remove(maintenanceTaskName);
+            deleted = activeTasks.remove(maintenanceTaskName) != null;
         else
-            requestedTasks.remove(maintenanceTaskName);
+            deleted = requestedTasks.remove(maintenanceTaskName) != null;
 
         try {
             fileStorage.deleteMaintenanceTask(maintenanceTaskName);
@@ -237,6 +244,8 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
 
             fileStorage.clear();
         }
+
+        return deleted;
     }
 
     /** {@inheritDoc} */
@@ -247,14 +256,14 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
         List<MaintenanceAction<?>> actions = cb.allActions();
 
         if (actions == null || actions.isEmpty())
-            throw new IgniteException("Maintenance workflow callback should provide at least one mainetance action");
+            throw new IgniteException("Maintenance workflow callback should provide at least one maintenance action");
 
         int size = actions.size();
-        long distinctSize = actions.stream().map(a -> a.name()).distinct().count();
+        long distinctSize = actions.stream().map(MaintenanceAction::name).distinct().count();
 
         if (distinctSize < size)
             throw new IgniteException("All actions of a single workflow should have unique names: " +
-                actions.stream().map(a -> a.name()).collect(Collectors.joining(", ")));
+                actions.stream().map(MaintenanceAction::name).collect(Collectors.joining(", ")));
 
         Optional<String> wrongActionName = actions
             .stream()
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 2c366eb..1484e30 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -748,7 +748,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /** */
-    private void prepareCacheDefragmentation(List<Integer> cacheGroupIds) throws IgniteCheckedException {
+    private void prepareCacheDefragmentation(List<String> cacheNames) throws IgniteCheckedException {
         GridKernalContext kernalCtx = cctx.kernalContext();
         DataStorageConfiguration dsCfg = kernalCtx.config().getDataStorageConfiguration();
 
@@ -778,7 +778,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         lightCheckpointMgr.start();
 
         defrgMgr = new CachePartitionDefragmentationManager(
-            cacheGroupIds,
+            cacheNames,
             cctx,
             this,
             (FilePageStoreManager)cctx.pageStore(),
@@ -788,6 +788,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         );
     }
 
+    /** */
+    public CachePartitionDefragmentationManager defragmentationManager() {
+        return defrgMgr;
+    }
+
     /** {@inheritDoc} */
     @Override public DataRegion addDataRegion(DataStorageConfiguration dataStorageCfg, DataRegionConfiguration dataRegionCfg,
         boolean trackable, PageReadWriteManager pmPageMgr) throws IgniteCheckedException {
@@ -826,9 +831,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     .registerWorkflowCallbackIfTaskExists(
                         DEFRAGMENTATION_MNTC_TASK_NAME,
                         task -> {
-                            prepareCacheDefragmentation(fromStore(task).cacheGroupIds());
+                            prepareCacheDefragmentation(fromStore(task).cacheNames());
 
-                            return new DefragmentationWorkflowCallback(cctx.kernalContext()::log, defrgMgr);
+                            return new DefragmentationWorkflowCallback(
+                                cctx.kernalContext()::log,
+                                defrgMgr,
+                                cctx.kernalContext().failure()
+                            );
                         }
                     );
             }
@@ -1097,6 +1106,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
+        if (defrgMgr != null)
+            defrgMgr.cancel();
+
         checkpointManager.stop(cancel);
 
         super.onKernalStop0(cancel);
@@ -1395,6 +1407,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture exchangeFut) {
+        if (defrgMgr != null)
+            return;
+
         rebuildIndexes(cctx.cacheContexts(), (cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()));
     }
 
@@ -1887,6 +1902,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         cctx.wal().resumeLogging(walTail);
     }
 
+    /** */
+    public void preserveWalTailPointer() throws IgniteCheckedException {
+        walTail = cctx.wal().flush(null, true);
+    }
+
     /**
      * @param grpId Cache group id.
      * @param partId Partition ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index 006fa8e..41999fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongConsumer;
@@ -102,7 +103,7 @@ public class CachePartitionDefragmentationManager {
     public static final String DEFRAGMENTATION_MNTC_TASK_NAME = "defragmentationMaintenanceTask";
 
     /** */
-    private final Set<Integer> cacheGroupsForDefragmentation;
+    private final Set<String> cachesForDefragmentation;
 
     /** Cache shared context. */
     private final GridCacheSharedContext<?, ?> sharedCtx;
@@ -137,8 +138,14 @@ public class CachePartitionDefragmentationManager {
     /** */
     private final DataRegion mappingDataRegion;
 
+    /** */
+    private final AtomicBoolean cancel = new AtomicBoolean();
+
+    /** */
+    private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();
+
     /**
-     * @param cacheGrpIds
+     * @param cacheNames Names of caches to be defragmented. Empty means "all".
      * @param sharedCtx Cache shared context.
      * @param dbMgr Database manager.
      * @param filePageStoreMgr File page store manager.
@@ -147,7 +154,7 @@ public class CachePartitionDefragmentationManager {
      * @param pageSize Page size.
      */
     public CachePartitionDefragmentationManager(
-        List<Integer> cacheGrpIds,
+        List<String> cacheNames,
         GridCacheSharedContext<?, ?> sharedCtx,
         GridCacheDatabaseSharedManager dbMgr,
         FilePageStoreManager filePageStoreMgr,
@@ -155,7 +162,7 @@ public class CachePartitionDefragmentationManager {
         LightweightCheckpointManager defragmentationCheckpoint,
         int pageSize
     ) throws IgniteCheckedException {
-        cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds);
+        cachesForDefragmentation = new HashSet<>(cacheNames);
 
         this.dbMgr = dbMgr;
         this.filePageStoreMgr = filePageStoreMgr;
@@ -172,19 +179,24 @@ public class CachePartitionDefragmentationManager {
     }
 
     /** */
-    public void executeDefragmentation() throws IgniteCheckedException {
-        log.info("Defragmentation started.");
+    public void beforeDefragmentation() throws IgniteCheckedException {
+        // Checkpointer must be enabled so all pages on disk are in their latest valid state.
+        dbMgr.resumeWalLogging();
 
-        try {
-            // Checkpointer must be enabled so all pages on disk are in their latest valid state.
-            dbMgr.resumeWalLogging();
+        dbMgr.onStateRestored(null);
 
-            dbMgr.onStateRestored(null);
+        nodeCheckpoint.forceCheckpoint("beforeDefragmentation", null).futureFor(FINISHED).get();
 
-            nodeCheckpoint.forceCheckpoint("beforeDefragmentation", null).futureFor(FINISHED).get();
+        dbMgr.preserveWalTailPointer();
 
-            sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+        sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+    }
+
+    /** */
+    public void executeDefragmentation() throws IgniteCheckedException {
+        log.info("Defragmentation started.");
 
+        try {
             // Now the actual process starts.
             TreeIterator treeIter = new TreeIterator(pageSize);
 
@@ -197,241 +209,272 @@ public class CachePartitionDefragmentationManager {
 
                 int grpId = oldGrpCtx.groupId();
 
-                if (!cacheGroupsForDefragmentation.isEmpty() && !cacheGroupsForDefragmentation.contains(grpId))
-                    continue;
+                if (!cachesForDefragmentation.isEmpty()) {
+                    if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name())))
+                        continue;
+                }
 
                 File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName());
 
-                if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
-                    continue;
+                try {
+                    if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
+                        continue;
 
-                GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
+                    GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
 
-                List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
-                    .filter(store -> {
-                        try {
-                            return filePageStoreMgr.exists(grpId, store.partId());
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw new IgniteException(e);
+                    List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
+                        .filter(store -> {
+                            try {
+                                return filePageStoreMgr.exists(grpId, store.partId());
+                            }
+                            catch (IgniteCheckedException e) {
+                                throw new IgniteException(e);
+                            }
+                        })
+                        .collect(Collectors.toList());
+
+                    if (workDir != null && !oldCacheDataStores.isEmpty()) {
+                        // We can't start defragmentation of new group on the region that has wrong eviction mode.
+                        // So waiting of the previous cache group defragmentation is inevitable.
+                        DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
+
+                        if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
+                            prevPageEvictionMode = curPageEvictionMode;
+
+                            partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+
+                            if (idxDfrgFut != null)
+                                idxDfrgFut.get();
                         }
-                    })
-                    .collect(Collectors.toList());
 
-                if (workDir != null && !oldCacheDataStores.isEmpty()) {
-                    // We can't start defragmentation of new group on the region that has wrong eviction mode.
-                    // So waiting of the previous cache group defragmentation is inevitable.
-                    DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
+                        IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
 
-                    if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
-                        prevPageEvictionMode = curPageEvictionMode;
+                        for (CacheDataStore store : offheap.cacheDataStores()) {
+                            // Tree can be null for not yet initialized partitions.
+                            // This would mean that these partitions are empty.
+                            assert store.tree() == null || store.tree().groupId() == grpId;
 
-                        partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+                            if (store.tree() != null)
+                                cacheDataStores.put(store.partId(), store);
+                        }
 
-                        if (idxDfrgFut != null)
-                            idxDfrgFut.get();
-                    }
+                        dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
 
-                    IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
+                        // Another cheat. Ttl cleanup manager knows too much shit.
+                        oldGrpCtx.caches().stream()
+                            .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+                            .forEach(cacheCtx -> cacheCtx.ttl().unregister());
 
-                    for (CacheDataStore store : offheap.cacheDataStores()) {
-                        // Tree can be null for not yet initialized partitions.
-                        // This would mean that these partitions are empty.
-                        assert store.tree() == null || store.tree().groupId() == grpId;
+                        // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care
+                        // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
+                        // cache group explicitly.
+                        oldGrpCtx.localWalEnabled(false, false);
 
-                        if (store.tree() != null)
-                            cacheDataStores.put(store.partId(), store);
-                    }
+                        boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
 
-                    dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+                        FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
 
-                    // Another cheat. Ttl cleanup manager knows too much shit.
-                    oldGrpCtx.caches().stream()
-                        .filter(cacheCtx -> cacheCtx.groupId() == grpId)
-                        .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+                        createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> {
+                        }); //TODO Allocated tracker.
 
-                    // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care and
-                    // WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
-                    // cache group explicitly.
-                    oldGrpCtx.localWalEnabled(false, false);
+                        checkCancellation();
 
-                    boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
+                        GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
 
-                    FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+                        PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
 
-                    createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> {
-                    }); //TODO Allocated tracker.
+                        CacheGroupContext newGrpCtx = new CacheGroupContext(
+                            sharedCtx,
+                            grpId,
+                            oldGrpCtx.receivedFrom(),
+                            CacheType.USER,
+                            oldGrpCtx.config(),
+                            oldGrpCtx.affinityNode(),
+                            partDataRegion,
+                            oldGrpCtx.cacheObjectContext(),
+                            null,
+                            null,
+                            oldGrpCtx.localStartVersion(),
+                            true,
+                            false,
+                            true
+                        );
 
-                    GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
+                        defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
 
-                    PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
+                        try {
+                            // This will initialize partition meta in index partition - meta tree and reuse list.
+                            newGrpCtx.start();
+                        }
+                        finally {
+                            defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+                        }
 
-                    CacheGroupContext newGrpCtx = new CacheGroupContext(
-                        sharedCtx,
-                        grpId,
-                        oldGrpCtx.receivedFrom(),
-                        CacheType.USER,
-                        oldGrpCtx.config(),
-                        oldGrpCtx.affinityNode(),
-                        partDataRegion,
-                        oldGrpCtx.cacheObjectContext(),
-                        null,
-                        null,
-                        oldGrpCtx.localStartVersion(),
-                        true,
-                        false,
-                        true
-                    );
+                        IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
 
-                    defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+                        for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
+                            checkCancellation();
 
-                    try {
-                        // This will initialize partition meta in index partition - meta tree and reuse list.
-                        newGrpCtx.start();
-                    }
-                    finally {
-                        defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
-                    }
+                            int partId = oldCacheDataStore.partId();
 
-                    IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
+                            PartitionContext partCtx = new PartitionContext(
+                                workDir,
+                                grpId,
+                                partId,
+                                partDataRegion,
+                                mappingDataRegion,
+                                oldGrpCtx,
+                                newGrpCtx,
+                                cacheDataStores.get(partId),
+                                pageStoreFactory
+                            );
 
-                    for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
-                        int partId = oldCacheDataStore.partId();
+                            if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
+                                partCtx.createPageStore(
+                                    () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+                                    partCtx.mappingPagesAllocated,
+                                    partCtx.mappingPageMemory
+                                );
 
-                        PartitionContext partCtx = new PartitionContext(
-                            workDir,
-                            grpId,
-                            partId,
-                            partDataRegion,
-                            mappingDataRegion,
-                            oldGrpCtx,
-                            newGrpCtx,
-                            cacheDataStores.get(partId),
-                            pageStoreFactory
-                        );
+                                linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+
+                                continue;
+                            }
 
-                        if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
                             partCtx.createPageStore(
                                 () -> defragmentedPartMappingFile(workDir, partId).toPath(),
                                 partCtx.mappingPagesAllocated,
                                 partCtx.mappingPageMemory
                             );
 
-                            linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+                            linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
 
-                            continue;
-                        }
-
-                        partCtx.createPageStore(
-                            () -> defragmentedPartMappingFile(workDir, partId).toPath(),
-                            partCtx.mappingPagesAllocated,
-                            partCtx.mappingPageMemory
-                        );
+                            checkCancellation();
 
-                        linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+                            partCtx.createPageStore(
+                                () -> defragmentedPartTmpFile(workDir, partId).toPath(),
+                                partCtx.partPagesAllocated,
+                                partCtx.partPageMemory
+                            );
 
-                        partCtx.createPageStore(
-                            () -> defragmentedPartTmpFile(workDir, partId).toPath(),
-                            partCtx.partPagesAllocated,
-                            partCtx.partPageMemory
-                        );
+                            partCtx.createNewCacheDataStore(offheap);
 
-                        partCtx.createNewCacheDataStore(offheap);
+                            copyPartitionData(partCtx, treeIter);
 
-                        copyPartitionData(partCtx, treeIter);
+                            IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
+                                if (fut.error() != null)
+                                    return;
 
-                        IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
-                            if (fut.error() != null)
-                                return;
+                                PageStore oldPageStore = null;
 
-                            PageStore oldPageStore = null;
+                                try {
+                                    oldPageStore = filePageStoreMgr.getStore(grpId, partId);
+                                }
+                                catch (IgniteCheckedException ignore) {
+                                }
 
-                            try {
-                                oldPageStore = filePageStoreMgr.getStore(grpId, partId);
-                            }
-                            catch (IgniteCheckedException ignore) {
-                            }
+                                assert oldPageStore != null;
 
-                            if (log.isDebugEnabled()) {
-                                log.debug(S.toString(
-                                    "Partition defragmented",
-                                    "grpId", grpId, false,
-                                    "partId", partId, false,
-                                    "oldPages", oldPageStore.pages(), false,
-                                    "newPages", partCtx.partPagesAllocated.get() + 1, false,
-                                    "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
-                                    "pageSize", pageSize, false,
-                                    "partFile", defragmentedPartFile(workDir, partId).getName(), false,
-                                    "workDir", workDir, false
-                                ));
-                            }
+                                if (log.isDebugEnabled()) {
+                                    log.debug(S.toString(
+                                        "Partition defragmented",
+                                        "grpId", grpId, false,
+                                        "partId", partId, false,
+                                        "oldPages", oldPageStore.pages(), false,
+                                        "newPages", partCtx.partPagesAllocated.get() + 1, false,
+                                        "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
+                                        "pageSize", pageSize, false,
+                                        "partFile", defragmentedPartFile(workDir, partId).getName(), false,
+                                        "workDir", workDir, false
+                                    ));
+                                }
 
-                            oldPageMem.invalidate(grpId, partId);
+                                oldPageMem.invalidate(grpId, partId);
 
-                            partCtx.partPageMemory.invalidate(grpId, partId);
+                                partCtx.partPageMemory.invalidate(grpId, partId);
 
-                            DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+                                DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
 
-                            pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
+                                pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
 
-                            renameTempPartitionFile(workDir, partId);
-                        };
+                                renameTempPartitionFile(workDir, partId);
+                            };
 
-                        GridFutureAdapter<?> cpFut = defragmentationCheckpoint
-                            .forceCheckpoint("partition defragmented", null)
-                            .futureFor(CheckpointState.FINISHED);
+                            GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+                                .forceCheckpoint("partition defragmented", null)
+                                .futureFor(CheckpointState.FINISHED);
 
-                        cpFut.listen(cpLsnr);
+                            cpFut.listen(cpLsnr);
 
-                        cmpFut.add((IgniteInternalFuture<Object>)cpFut);
-                    }
+                            cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+                        }
 
-                    // A bit too general for now, but I like it more then saving only the last checkpoint future.
-                    cmpFut.markInitialized().get();
+                        // A bit too general for now, but I like it more then saving only the last checkpoint future.
+                        cmpFut.markInitialized().get();
 
-                    idxDfrgFut = new GridFinishedFuture<>();
+                        idxDfrgFut = new GridFinishedFuture<>();
 
-                    if (filePageStoreMgr.hasIndexStore(grpId)) {
-                        defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
+                        if (filePageStoreMgr.hasIndexStore(grpId)) {
+                            defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
 
-                        idxDfrgFut = defragmentationCheckpoint
-                            .forceCheckpoint("index defragmented", null)
-                            .futureFor(CheckpointState.FINISHED);
-                    }
+                            idxDfrgFut = defragmentationCheckpoint
+                                .forceCheckpoint("index defragmented", null)
+                                .futureFor(CheckpointState.FINISHED);
+                        }
 
-                    idxDfrgFut.listen(fut -> {
-                        oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+                        idxDfrgFut = idxDfrgFut.chain(fut -> {
+                            oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
 
-                        PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
+                            PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
 
-                        partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+                            partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
 
-                        DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
+                            DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
 
-                        pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
+                            pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
 
-                        PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
+                            PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
 
-                        pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
+                            pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
 
-                        pageMgr.pageStoreMap().clear(grpId);
+                            pageMgr.pageStoreMap().clear(grpId);
 
-                        renameTempIndexFile(workDir);
+                            renameTempIndexFile(workDir);
 
-                        writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
+                            writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
 
-                        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
-                    });
+                            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+                            return null;
+                        });
+                    }
                 }
+                catch (DefragmentationCancelledException e) {
+                    DefragmentationFileUtils.deleteLeftovers(workDir);
 
-                // I guess we should wait for it?
-                if (idxDfrgFut != null)
-                    idxDfrgFut.get();
+                    throw e;
+                }
             }
 
+            if (idxDfrgFut != null)
+                idxDfrgFut.get();
+
             mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
 
             log.info("Defragmentation completed. All partitions are defragmented.");
+
+            completionFut.onDone();
+        }
+        catch (DefragmentationCancelledException e) {
+            mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+            log.info("Defragmentation has been cancelled.");
+
+            completionFut.onDone();
+        }
+        catch (Throwable t) {
+            completionFut.onDone(t);
+
+            throw t;
         }
         finally {
             defragmentationCheckpoint.stop(true);
@@ -439,6 +482,11 @@ public class CachePartitionDefragmentationManager {
     }
 
     /** */
+    public IgniteInternalFuture<?> completionFuture() {
+        return completionFut.chain(future -> null);
+    }
+
+    /** */
     public void createIndexPageStore(
         int grpId,
         File workDir,
@@ -476,6 +524,39 @@ public class CachePartitionDefragmentationManager {
     }
 
     /**
+     * Cancel the process of defragmentation.
+     *
+     * @return {@code true} if process was cancelled by this method.
+     */
+    public boolean cancel() {
+        if (completionFut.isDone())
+            return false;
+
+        if (cancel.compareAndSet(false, true)) {
+            try {
+                completionFut.get();
+            }
+            catch (Throwable ignore) {
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** */
+    private void checkCancellation() throws DefragmentationCancelledException {
+        if (cancel.get())
+            throw new DefragmentationCancelledException();
+    }
+
+    /** */
+    public String status() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /**
      * Defragmentate partition.
      *
      * @param partCtx
@@ -501,6 +582,8 @@ public class CachePartitionDefragmentationManager {
             AtomicInteger entriesProcessed = new AtomicInteger();
 
             treeIter.iterate(tree, partCtx.cachePageMemory, (tree0, io, pageAddr, idx) -> {
+                checkCancellation();
+
                 if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
                     defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
 
@@ -541,6 +624,8 @@ public class CachePartitionDefragmentationManager {
                 return true;
             });
 
+            checkCancellation();
+
             defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
 
             defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
@@ -673,12 +758,15 @@ public class CachePartitionDefragmentationManager {
 
         CheckpointTimeoutLock cpLock = defragmentationCheckpoint.checkpointTimeoutLock();
 
+        Runnable cancellationChecker = this::checkCancellation;
+
         idx.defragment(
             grpCtx,
             newCtx,
             (PageMemoryEx)partDataRegion.pageMemory(),
             mappingByPartition,
-            cpLock
+            cpLock,
+            cancellationChecker
         );
     }
 
@@ -824,4 +912,10 @@ public class CachePartitionDefragmentationManager {
             this.newCacheDataStore = newCacheDataStore;
         }
     }
+
+    /** */
+    private static class DefragmentationCancelledException extends RuntimeException {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
index b4273cd..214e17d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
@@ -81,16 +81,7 @@ public class DefragmentationFileUtils {
 
             U.delete(defragmentationCompletionMarkerFile(workDir));
 
-            for (File file : workDir.listFiles()) {
-                String fileName = file.getName();
-
-                if (
-                    fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
-                    || fileName.startsWith(DFRG_INDEX_FILE_NAME)
-                    || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
-                )
-                    U.delete(file);
-            }
+            deleteLeftovers(workDir);
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -98,6 +89,24 @@ public class DefragmentationFileUtils {
     }
 
     /**
+     * Deletes all defragmentation related file from work directory, except for completion marker.
+     *
+     * @param workDir Cache group working directory.
+     */
+    public static void deleteLeftovers(File workDir) {
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                    || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                    || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
      * Checks whether cache group defragmentation completed or not. Completes it if all that's left is renaming.
      *
      * @param workDir Cache group working directory.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
index 6bc3ddc..499e247 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.ignite.maintenance.MaintenanceTask;
@@ -29,31 +30,31 @@ import static org.apache.ignite.internal.processors.cache.persistence.defragment
  */
 public class DefragmentationParameters {
     /** */
-    public static final String CACHE_GROUP_ID_SEPARATOR = ",";
+    public static final String SEPARATOR = "/";
 
     /** */
-    private final List<Integer> cacheGrpIds;
+    private final List<String> cacheNames;
 
     /**
-     * @param cacheGrpIds Id of cache group for defragmentations.
+     * @param cacheNames Names of caches for defragmentations.
      */
-    private DefragmentationParameters(List<Integer> cacheGrpIds) {
-        this.cacheGrpIds = cacheGrpIds;
+    private DefragmentationParameters(List<String> cacheNames) {
+        this.cacheNames = cacheNames;
     }
 
     /**
      * Convert parameter to maintenance storage.
      *
-     * @param cacheGroupIds Cache group ids for defragmentation.
+     * @param cacheNames Names of caches for defragmentations.
      * @return Maintenance task.
      */
-    public static MaintenanceTask toStore(List<Integer> cacheGroupIds) {
+    public static MaintenanceTask toStore(List<String> cacheNames) {
         return new MaintenanceTask(
             DEFRAGMENTATION_MNTC_TASK_NAME,
-            "Cache group defragmentation",
-            cacheGroupIds.stream()
+            "Caches defragmentation",
+            cacheNames.stream()
                 .map(String::valueOf)
-                .collect(Collectors.joining(CACHE_GROUP_ID_SEPARATOR))
+                .collect(Collectors.joining(SEPARATOR))
         );
     }
 
@@ -62,17 +63,20 @@ public class DefragmentationParameters {
      * @return Defragmentation parameters.
      */
     public static DefragmentationParameters fromStore(MaintenanceTask rawTask) {
+        if (rawTask.parameters() == null)
+            return new DefragmentationParameters(Collections.emptyList());
+
         return new DefragmentationParameters(Arrays.stream(rawTask.parameters()
-            .split(CACHE_GROUP_ID_SEPARATOR))
-            .map(Integer::valueOf)
+            .split(SEPARATOR))
             .collect(Collectors.toList())
         );
     }
 
     /**
-     * @return Cache groups ids.
+     * @return Cache names.
      */
-    public List<Integer> cacheGroupIds() {
-        return cacheGrpIds;
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public List<String> cacheNames() {
+        return cacheNames;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
index a809579..b3bbe51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.function.Function;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.maintenance.MaintenanceAction;
 import org.apache.ignite.maintenance.MaintenanceWorkflowCallback;
 import org.jetbrains.annotations.NotNull;
@@ -37,16 +38,22 @@ public class DefragmentationWorkflowCallback implements MaintenanceWorkflowCallb
     /** Logger provider. */
     private final Function<Class<?>, IgniteLogger> logProvider;
 
+    /** Failure processor. */
+    private final FailureProcessor failureProc;
+
     /**
      * @param logProvider Logger provider.
      * @param defrgMgr Defragmentation manager.
+     * @param failureProc Failure processor.
      */
     public DefragmentationWorkflowCallback(
         Function<Class<?>, IgniteLogger> logProvider,
-        CachePartitionDefragmentationManager defrgMgr
+        CachePartitionDefragmentationManager defrgMgr,
+        FailureProcessor failureProc
     ) {
         this.defrgMgr = defrgMgr;
         this.logProvider = logProvider;
+        this.failureProc = failureProc;
     }
 
     /** {@inheritDoc} */
@@ -56,11 +63,11 @@ public class DefragmentationWorkflowCallback implements MaintenanceWorkflowCallb
 
     /** {@inheritDoc} */
     @Override public @NotNull List<MaintenanceAction<?>> allActions() {
-        return Collections.singletonList(automaticAction());
+        return Collections.singletonList(new StopDefragmentationAction(defrgMgr));
     }
 
     /** {@inheritDoc} */
     @Override public @Nullable MaintenanceAction<Boolean> automaticAction() {
-        return new ExecuteDefragmentationAction(logProvider, defrgMgr);
+        return new ExecuteDefragmentationAction(logProvider, defrgMgr, failureProc);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
index 42b2de7..0758772 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
@@ -21,7 +21,10 @@ import java.util.function.Function;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.maintenance.MaintenanceAction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -36,29 +39,53 @@ class ExecuteDefragmentationAction implements MaintenanceAction<Boolean> {
     /** Defragmentation manager. */
     private final CachePartitionDefragmentationManager defrgMgr;
 
+    /** Failure processor. */
+    private final FailureProcessor failureProc;
+
     /**
      * @param logFunction Logger provider.
      * @param defrgMgr Defragmentation manager.
+     * @param failureProc Failure processor.
      */
     public ExecuteDefragmentationAction(
         Function<Class<?>, IgniteLogger> logFunction,
-        CachePartitionDefragmentationManager defrgMgr
+        CachePartitionDefragmentationManager defrgMgr,
+        FailureProcessor failureProc
     ) {
         this.log = logFunction.apply(ExecuteDefragmentationAction.class);
         this.defrgMgr = defrgMgr;
+        this.failureProc = failureProc;
     }
 
     /** {@inheritDoc} */
     @Override public Boolean execute() {
         try {
-            defrgMgr.executeDefragmentation();
+            defrgMgr.beforeDefragmentation();
         }
         catch (IgniteCheckedException | IgniteException e) {
-            log.error("Defragmentation is failed", e);
+            log.error("Checkpoint before defragmentation failed", e);
 
             return false;
         }
 
+        Thread defrgThread = new Thread(() -> {
+            try {
+                defrgMgr.executeDefragmentation();
+            }
+            catch (Throwable e) {
+                log.error("Defragmentation failed", e);
+
+                // TODO Check other options.
+                failureProc.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
+        });
+
+        defrgThread.setName("defragmentation-thread");
+
+        defrgThread.setDaemon(true);
+
+        defrgThread.start();
+
         return true;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java
index 42b2de7..4b40b9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java
@@ -17,58 +17,37 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance;
 
-import java.util.function.Function;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
 import org.apache.ignite.maintenance.MaintenanceAction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Action which allows to start the defragmentation process.
+ * Action which allows to stop the defragmentation at any time from maintenance mode processor.
  */
-class ExecuteDefragmentationAction implements MaintenanceAction<Boolean> {
-    /** Logger. */
-    private final IgniteLogger log;
-
+class StopDefragmentationAction implements MaintenanceAction<Boolean> {
     /** Defragmentation manager. */
-    private final CachePartitionDefragmentationManager defrgMgr;
+    private final CachePartitionDefragmentationManager defragmentationMgr;
 
     /**
-     * @param logFunction Logger provider.
-     * @param defrgMgr Defragmentation manager.
+     * @param defragmentationMgr Defragmentation manager.
      */
-    public ExecuteDefragmentationAction(
-        Function<Class<?>, IgniteLogger> logFunction,
-        CachePartitionDefragmentationManager defrgMgr
-    ) {
-        this.log = logFunction.apply(ExecuteDefragmentationAction.class);
-        this.defrgMgr = defrgMgr;
+    public StopDefragmentationAction(CachePartitionDefragmentationManager defragmentationMgr) {
+        this.defragmentationMgr = defragmentationMgr;
     }
 
     /** {@inheritDoc} */
     @Override public Boolean execute() {
-        try {
-            defrgMgr.executeDefragmentation();
-        }
-        catch (IgniteCheckedException | IgniteException e) {
-            log.error("Defragmentation is failed", e);
-
-            return false;
-        }
-
-        return true;
+        return defragmentationMgr.cancel();
     }
 
     /** {@inheritDoc} */
     @Override public @NotNull String name() {
-        return "execute";
+        return "stop";
     }
 
     /** {@inheritDoc} */
     @Override public @Nullable String description() {
-        return "Starting the process of defragmentation.";
+        return "Stopping the defragmentation process immediately";
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 236de0c..117fdeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -501,6 +501,7 @@ public interface GridQueryIndexing {
      * @param partPageMem Partition page memory.
      * @param mappingByPart Mapping page memory.
      * @param cpLock Defragmentation checkpoint read lock.
+     * @param cancellationChecker Cancellation checker.
      *
      * @throws IgniteCheckedException If failed.
      */
@@ -509,6 +510,7 @@ public interface GridQueryIndexing {
         CacheGroupContext newCtx,
         PageMemoryEx partPageMem,
         IntMap<LinkMap> mappingByPart,
-        CheckpointTimeoutLock cpLock
+        CheckpointTimeoutLock cpLock,
+        Runnable cancellationChecker
     ) throws IgniteCheckedException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b3b644d..bda05be 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -5803,10 +5803,24 @@ public abstract class IgniteUtils {
      * @param e Enum value to write, possibly {@code null}.
      * @throws IOException If write failed.
      */
-    public static <E extends Enum> void writeEnum(DataOutput out, E e) throws IOException {
+    public static <E extends Enum<E>> void writeEnum(DataOutput out, E e) throws IOException {
         out.writeByte(e == null ? -1 : e.ordinal());
     }
 
+    /** */
+    public static <E extends Enum<E>> E readEnum(DataInput in, Class<E> enumCls) throws IOException {
+        byte ordinal = in.readByte();
+
+        if (ordinal == (byte)-1)
+            return null;
+
+        int idx = ordinal & 0xFF;
+
+        E[] values = enumCls.getEnumConstants();
+
+        return idx < values.length ? values[idx] : null;
+    }
+
     /**
      * Gets collection value by index.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java
new file mode 100644
index 0000000..9cc5eab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+/** */
+public enum VisorDefragmentationOperation {
+    /** */
+    SCHEDULE,
+    /** */
+    STATUS,
+    /** */
+    CANCEL
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java
new file mode 100644
index 0000000..14cea62
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.processors.task.GridVisorManagementTask;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.maintenance.MaintenanceTask;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore;
+
+/** */
+@GridInternal
+@GridVisorManagementTask
+public class VisorDefragmentationTask extends VisorMultiNodeTask
+    <VisorDefragmentationTaskArg, VisorDefragmentationTaskResult, VisorDefragmentationTaskResult>
+{
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorJob<VisorDefragmentationTaskArg, VisorDefragmentationTaskResult> job(
+        VisorDefragmentationTaskArg arg
+    ) {
+        return new VisorDefragmentationJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected VisorDefragmentationTaskResult reduce0(List<ComputeJobResult> results) {
+        if (taskArg.operation() == VisorDefragmentationOperation.SCHEDULE) {
+            StringBuilder msg = new StringBuilder();
+
+            for (ComputeJobResult res : results) {
+                msg.append(res.getNode().consistentId()).append(":\n");
+
+                if (res.getData() == null)
+                    msg.append("    err=").append(res.getException()).append('\n');
+                else {
+                    VisorDefragmentationTaskResult data = res.getData();
+
+                    msg.append("    success=").append(data.isSuccess()).append('\n');
+                    msg.append("    msg=").append(data.getMessage()).append('\n');
+                }
+            }
+
+            return new VisorDefragmentationTaskResult(true, msg.toString());
+        }
+
+        assert results.size() == 1;
+
+        ComputeJobResult res = results.get(0);
+
+        if (res.getException() == null)
+            return res.getData();
+
+        throw res.getException();
+    }
+
+    /** */
+    private static class VisorDefragmentationJob extends VisorJob<VisorDefragmentationTaskArg, VisorDefragmentationTaskResult> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with specified argument.
+         *
+         * @param arg Job argument.
+         * @param debug Flag indicating whether debug information should be printed into node log.
+         */
+        protected VisorDefragmentationJob(@Nullable VisorDefragmentationTaskArg arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected VisorDefragmentationTaskResult run(
+            @Nullable VisorDefragmentationTaskArg arg
+        ) throws IgniteException {
+            switch (arg.operation()) {
+                case SCHEDULE:
+                    return runSchedule(arg);
+
+                case STATUS:
+                    return runStatus(arg);
+
+                case CANCEL:
+                    return runCancel(arg);
+            }
+
+            throw new IllegalArgumentException("Operation: " + arg.operation());
+        }
+
+        /** */
+        private VisorDefragmentationTaskResult runSchedule(VisorDefragmentationTaskArg arg) {
+            MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
+
+            MaintenanceTask oldTask;
+
+            try {
+                List<String> cacheNames = arg.cacheNames();
+
+                oldTask = mntcReg.registerMaintenanceTask(toStore(cacheNames == null ? Collections.emptyList() : cacheNames));
+            }
+            catch (IgniteCheckedException e) {
+                return new VisorDefragmentationTaskResult(false, "Scheduling failed: " + e.getMessage());
+            }
+
+            return new VisorDefragmentationTaskResult(
+                true,
+                "Scheduling completed successfully." +
+                (oldTask == null ? "" : " Previously scheduled task has been removed.")
+            );
+        }
+
+        /** */
+        private VisorDefragmentationTaskResult runStatus(VisorDefragmentationTaskArg arg) {
+            MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
+
+            if (!mntcReg.isMaintenanceMode())
+                return new VisorDefragmentationTaskResult(false, "Node is not in maintenance node.");
+
+            IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database();
+
+            assert dbMgr instanceof GridCacheDatabaseSharedManager;
+
+            CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr)
+                .defragmentationManager();
+
+            if (defrgMgr == null)
+                return new VisorDefragmentationTaskResult(true, "There's no active defragmentation process on the node.");
+
+            return new VisorDefragmentationTaskResult(true, defrgMgr.status());
+        }
+
+        /** */
+        private VisorDefragmentationTaskResult runCancel(VisorDefragmentationTaskArg arg) {
+            assert arg.cacheNames() == null : "Cancelling specific caches is not yet implemented";
+
+            MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
+
+            if (!mntcReg.isMaintenanceMode()) {
+                boolean deleted = mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+                String msg = deleted
+                    ? "Scheduled defragmentation task cancelled successfully."
+                    : "Scheduled defragmentation task is not found.";
+
+                return new VisorDefragmentationTaskResult(true, msg);
+            }
+            else {
+                List<MaintenanceAction<?>> actions;
+
+                try {
+                    actions = mntcReg.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+                }
+                catch (IgniteException e) {
+                    return new VisorDefragmentationTaskResult(true, "Defragmentation is already completed or has been cancelled previously.");
+                }
+
+                Optional<MaintenanceAction<?>> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny();
+
+                assert stopAct.isPresent();
+
+                try {
+                    Object res = stopAct.get().execute();
+
+                    assert res instanceof Boolean;
+
+                    boolean cancelled = (Boolean)res;
+
+                    String msg = cancelled
+                        ? "Defragmentation cancelled successfully."
+                        : "Defragmentation is already completed or has been cancelled previously.";
+
+                    return new VisorDefragmentationTaskResult(true, msg);
+                }
+                catch (Exception e) {
+                    return new VisorDefragmentationTaskResult(false, "Exception occurred: " + e.getMessage());
+                }
+            }
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java
new file mode 100644
index 0000000..1b1c8b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class VisorDefragmentationTaskArg extends IgniteDataTransferObject {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private VisorDefragmentationOperation operation;
+
+    /** */
+    private List<String> nodeIds;
+
+    /** */
+    private List<String> cacheNames;
+
+    /** Empty constructor for serialization. */
+    public VisorDefragmentationTaskArg() {
+        // No-op.
+    }
+
+    /** */
+    public VisorDefragmentationTaskArg(
+        VisorDefragmentationOperation operation,
+        List<String> nodeIds,
+        List<String> cacheNames
+    ) {
+
+        this.operation = operation;
+        this.nodeIds = nodeIds;
+        this.cacheNames = cacheNames;
+    }
+
+    /** */
+    public VisorDefragmentationOperation operation() {
+        return operation;
+    }
+
+    /** */
+    public List<String> nodeIds() {
+        return nodeIds;
+    }
+
+    /** */
+    public List<String> cacheNames() {
+        return cacheNames;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+        U.writeEnum(out, operation);
+
+        U.writeCollection(out, nodeIds);
+
+        U.writeCollection(out, cacheNames);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+        operation = U.readEnum(in, VisorDefragmentationOperation.class);
+
+        nodeIds = U.readList(in);
+
+        cacheNames = U.readList(in);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java
new file mode 100644
index 0000000..4575ac2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+public class VisorDefragmentationTaskResult extends IgniteDataTransferObject {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private boolean success;
+
+    /** */
+    private String msg;
+
+    /** Empty constructor for serialization. */
+    public VisorDefragmentationTaskResult() {
+        // No-op.
+    }
+
+    /** */
+    public VisorDefragmentationTaskResult(boolean success, String msg) {
+        this.success = success;
+
+        this.msg = msg;
+    }
+
+    /** */
+    public boolean isSuccess() {
+        return success;
+    }
+
+    /** */
+    public String getMessage() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+        out.writeBoolean(success);
+
+        U.writeString(out, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException {
+        success = in.readBoolean();
+
+        msg = U.readString(in);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
index 9cebef0..a2cabde 100644
--- a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
@@ -104,8 +104,9 @@ public interface MaintenanceRegistry {
      * Deletes {@link MaintenanceTask} of given ID from maintenance registry.
      *
      * @param maintenanceTaskName name of {@link MaintenanceTask} to be deleted.
+     * @return {@code true} if existing task has been deleted.
      */
-    public void unregisterMaintenanceTask(String maintenanceTaskName);
+    public boolean unregisterMaintenanceTask(String maintenanceTaskName);
 
     /**
      * Returns active {@link MaintenanceTask} by its name.
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 92d01dd..f480d0f 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2124,6 +2124,11 @@ org.apache.ignite.internal.visor.debug.VisorThreadDumpTaskResult
 org.apache.ignite.internal.visor.debug.VisorThreadInfo
 org.apache.ignite.internal.visor.debug.VisorThreadLockInfo
 org.apache.ignite.internal.visor.debug.VisorThreadMonitorInfo
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask$VisorDefragmentationJob
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskArg
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskResult
 org.apache.ignite.internal.visor.event.VisorGridDeploymentEvent
 org.apache.ignite.internal.visor.event.VisorGridDiscoveryEvent
 org.apache.ignite.internal.visor.event.VisorGridEvent
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
index 10e1db1..9b1067a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
@@ -43,6 +43,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
 
 /**
  * Concurrent and advanced tests for WAL state change.
@@ -190,10 +191,18 @@ public class WalModeChangeAdvancedSelfTest extends WalModeChangeCommonAbstractSe
 
         cleanCacheDir(cacheToClean);
 
-        // Node should start successfully and not enter maintenance mode as MaintenanceRecord will be cleaned
+        // Node should start successfully and enter maintenance mode. MaintenanceRecord will be cleaned
         // automatically because corrupted PDS was deleted during downtime
         srv = startGrid(config(SRV_1, false, false));
-        assertFalse(srv.context().maintenanceRegistry().isMaintenanceMode());
+        assertTrue(srv.context().maintenanceRegistry().isMaintenanceMode());
+
+        try {
+            srv.context().maintenanceRegistry().actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
+
+            fail("Maintenance task is not completed yet for some reason.");
+        }
+        catch (Exception ignore) {
+        }
 
         stopAllGrids(false);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
index 8f06a48..4ce9bb4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
@@ -39,6 +39,9 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteState;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.IgnitionListener;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -48,12 +51,12 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.maintenance.MaintenanceFileStore;
 import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
-import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.maintenance.MaintenanceRegistry;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -200,10 +203,25 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
 
         startGrid(0);
 
+        waitForDefragmentation(0);
+
+        assertEquals(ClusterState.INACTIVE, grid(0).context().state().clusterState().state());
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> {
+                grid(0).cluster().state(ClusterState.ACTIVE);
+
+                return null;
+            },
+            IgniteCheckedException.class,
+            "Failed to activate cluster (node is in maintenance mode)"
+        );
+
         long[] newPartLen = partitionSizes(workDir);
 
         for (int p = 0; p < PARTS; p++)
-            assertTrue(newPartLen[p] < oldPartLen[p]); //TODO Fails.
+            assertTrue(newPartLen[p] < oldPartLen[p]);
 
         long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
 
@@ -252,11 +270,22 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
     }
 
     /** */
+    protected void waitForDefragmentation(int idx) throws IgniteCheckedException {
+        IgniteEx ig = grid(idx);
+
+        ((GridCacheDatabaseSharedManager)ig.context().cache().context().database())
+            .defragmentationManager()
+            .completionFuture()
+            .get();
+    }
+
+    /** */
     protected void createMaintenanceRecord() throws IgniteCheckedException {
         IgniteEx grid = grid(0);
+
         MaintenanceRegistry mntcReg = grid.context().maintenanceRegistry();
 
-        mntcReg.registerMaintenanceTask(toStore(Collections.singletonList(groupIdForCache(grid, DEFAULT_CACHE_NAME))));
+        mntcReg.registerMaintenanceTask(toStore(Collections.singletonList(DEFAULT_CACHE_NAME)));
     }
 
     /**
@@ -383,6 +412,29 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
 
         File workDir = resolveCacheWorkDir(ig);
 
+        //Defragmentation should fail when node starts.
+        startAndAwaitNodeFail(workDir);
+
+        c.accept(workDir);
+
+        startGrid(0); // Fails here VERY rarely. WTF?
+
+        waitForDefragmentation(0);
+
+        stopGrid(0);
+
+        // Everything must be completed.
+        startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+        validateCache(grid(0).cache(DEFAULT_CACHE_NAME));
+
+        validateLeftovers(workDir);
+    }
+
+    /**
+     * @throws IgniteInterruptedCheckedException If fail.
+     */
+    private void startAndAwaitNodeFail(File workDir) throws IgniteInterruptedCheckedException {
         String errMsg = "Failed to create defragmentation completion marker.";
 
         AtomicBoolean errOccurred = new AtomicBoolean();
@@ -405,34 +457,32 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
             return cfg;
         };
 
-        try {
-            startGrid(0, cfgOp);
-        }
-        catch (Exception ignore) {
-            // No-op.
-        }
-
-        // Failed node can leave interrupted status of the thread that needs to be cleared,
-        // otherwise following "wait" wouldn't work.
-        // This call can't be moved inside of "catch" block because interruption can actually be silent.
-        Thread.interrupted();
-
-        assertTrue(GridTestUtils.waitForCondition(errOccurred::get, 10_000L));
-
-        assertTrue(GridTestUtils.waitForCondition(() -> G.allGrids().isEmpty(), 10_000L));
-
-        c.accept(workDir);
-
-        startGrid(0);
-
-        stopGrid(0);
+        AtomicBoolean nodeStopped = new AtomicBoolean();
+        IgnitionListener nodeStopListener = (name, state) -> {
+            if (name.equals(getTestIgniteInstanceName(0)) && state == IgniteState.STOPPED_ON_FAILURE)
+                nodeStopped.set(true);
+        };
 
-        // Everything must be completed.
-        startGrid(0).cluster().state(ClusterState.ACTIVE);
+        Ignition.addListener(nodeStopListener);
+        try {
+            try {
+                startGrid(0, cfgOp);
+            }
+            catch (Exception ignore) {
+                // No-op.
+            }
 
-        validateCache(grid(0).cache(DEFAULT_CACHE_NAME));
+            // Failed node can leave interrupted status of the thread that needs to be cleared,
+            // otherwise following "wait" wouldn't work.
+            // This call can't be moved inside of "catch" block because interruption can actually be silent.
+            Thread.interrupted();
 
-        validateLeftovers(workDir);
+            assertTrue(GridTestUtils.waitForCondition(errOccurred::get, 3_000L));
+            assertTrue(GridTestUtils.waitForCondition(nodeStopped::get, 3_000L));
+        }
+        finally {
+            Ignition.removeListener(nodeStopListener);
+        }
     }
 
     /** */
@@ -463,6 +513,8 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
 
         startGrid(0);
 
+        waitForDefragmentation(0);
+
         File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
 
         AtomicReference<File> cachePartFile = new AtomicReference<>();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index 5f0b04f..732a48a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -345,7 +345,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
         CacheGroupContext newCtx,
         PageMemoryEx partPageMem,
         IntMap<LinkMap> mappingByPart,
-        CheckpointTimeoutLock cpLock
+        CheckpointTimeoutLock cpLock,
+        Runnable cancellationChecker
     ) throws IgniteCheckedException {
         // No-op.
     }
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
index 5950ed1..a4c2ec4 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
@@ -221,6 +221,15 @@ If the file name isn't specified the output file name is: '<typeId>.bin'
   Backup data files of only given caches:
     control.(sh|bat) --persistence backup caches cache1,cache2,cache3
 
+  Schedule PDS defragmentation on given nodes for all caches:
+    control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1
+
+  Schedule PDS defragmentation on given nodes but only for given caches:
+    control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 --caches cache1,cache2,cache3
+
+  Cancel scheduled or active PDS defragmentation on underlying node:
+    control.(sh|bat) --defragmentation cancel
+
 By default commands affecting the cluster require interactive confirmation.
 Use --yes option to disable it.
 
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
index 5950ed1..a4c2ec4 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
@@ -221,6 +221,15 @@ If the file name isn't specified the output file name is: '<typeId>.bin'
   Backup data files of only given caches:
     control.(sh|bat) --persistence backup caches cache1,cache2,cache3
 
+  Schedule PDS defragmentation on given nodes for all caches:
+    control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1
+
+  Schedule PDS defragmentation on given nodes but only for given caches:
+    control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 --caches cache1,cache2,cache3
+
+  Cancel scheduled or active PDS defragmentation on underlying node:
+    control.(sh|bat) --defragmentation cancel
+
 By default commands affecting the cluster require interactive confirmation.
 Use --yes option to disable it.
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 517bc69..6d4b4e3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -3201,8 +3201,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         CacheGroupContext newCtx,
         PageMemoryEx partPageMem,
         IntMap<LinkMap> mappingByPart,
-        CheckpointTimeoutLock cpLock
+        CheckpointTimeoutLock cpLock,
+        Runnable cancellationChecker
     ) throws IgniteCheckedException {
-        defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, log);
+        defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, cancellationChecker, log);
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
index c41f587..19d15c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
@@ -80,6 +80,7 @@ public class IndexingDefragmentation {
      * @param partPageMem Partition page memory.
      * @param mappingByPartition Mapping page memory.
      * @param cpLock Defragmentation checkpoint read lock.
+     * @param cancellationChecker Cancellation checker.
      * @param log Log.
      *
      * @throws IgniteCheckedException If failed.
@@ -90,6 +91,7 @@ public class IndexingDefragmentation {
         PageMemoryEx partPageMem,
         IntMap<LinkMap> mappingByPartition,
         CheckpointTimeoutLock cpLock,
+        Runnable cancellationChecker,
         IgniteLogger log
     ) throws IgniteCheckedException {
         int pageSize = grpCtx.cacheObjectContext().kernalContext().grid().configuration().getDataStorageConfiguration().getPageSize();
@@ -115,6 +117,8 @@ public class IndexingDefragmentation {
                 if (cctx.groupId() != grpCtx.groupId())
                     continue; // Not our index.
 
+                cancellationChecker.run();
+
                 GridH2RowDescriptor rowDesc = table.rowDescriptor();
 
                 List<Index> indexes = table.getIndexes();
@@ -155,6 +159,8 @@ public class IndexingDefragmentation {
                     H2Tree tree = oldH2Idx.treeForRead(i);
 
                     treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> {
+                        cancellationChecker.run();
+
                         if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
                             cpLock.checkpointReadUnlock();
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index bbb69ae..cf6b422 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -136,6 +136,8 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
 
         startGrid(0);
 
+        waitForDefragmentation(0);
+
         long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
 
         assertTrue(newIdxFileLen <= oldIdxFileLen);