You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/07 07:50:39 UTC
[ignite] branch master updated: IGNITE-13697 Schedule and cancel
control utility commands for defragmentation feature - Fixes #8449.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0de72a9 IGNITE-13697 Schedule and cancel control utility commands for defragmentation feature - Fixes #8449.
0de72a9 is described below
commit 0de72a908eac64d274aafdb59dd2f5598d0c470e
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Dec 7 10:48:07 2020 +0300
IGNITE-13697 Schedule and cancel control utility commands for defragmentation feature - Fixes #8449.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../ignite/internal/commandline/CommandList.java | 5 +-
.../commandline/DefragmentationCommand.java | 241 +++++++++++
.../defragmentation/DefragmentationArguments.java | 63 +++
.../DefragmentationSubcommands.java | 68 ++++
.../commandline/CommandHandlerParsingTest.java | 3 +-
.../testsuites/IgniteControlUtilityTestSuite.java | 3 +
.../GridCommandHandlerDefragmentationTest.java | 331 +++++++++++++++
.../internal/maintenance/MaintenanceProcessor.java | 27 +-
.../GridCacheDatabaseSharedManager.java | 28 +-
.../CachePartitionDefragmentationManager.java | 444 +++++++++++++--------
.../defragmentation/DefragmentationFileUtils.java | 29 +-
.../maintenance/DefragmentationParameters.java | 34 +-
.../DefragmentationWorkflowCallback.java | 13 +-
.../maintenance/ExecuteDefragmentationAction.java | 33 +-
...nAction.java => StopDefragmentationAction.java} | 39 +-
.../processors/query/GridQueryIndexing.java | 4 +-
.../apache/ignite/internal/util/IgniteUtils.java | 16 +-
.../VisorDefragmentationOperation.java | 28 ++
.../defragmentation/VisorDefragmentationTask.java | 211 ++++++++++
.../VisorDefragmentationTaskArg.java | 91 +++++
.../VisorDefragmentationTaskResult.java | 72 ++++
.../ignite/maintenance/MaintenanceRegistry.java | 3 +-
.../main/resources/META-INF/classnames.properties | 5 +
.../cache/WalModeChangeAdvancedSelfTest.java | 13 +-
.../persistence/IgnitePdsDefragmentationTest.java | 108 +++--
.../processors/query/DummyQueryIndexing.java | 3 +-
...ridCommandHandlerClusterByClassTest_help.output | 9 +
...andHandlerClusterByClassWithSSLTest_help.output | 9 +
.../processors/query/h2/IgniteH2Indexing.java | 5 +-
.../defragmentation/IndexingDefragmentation.java | 6 +
.../IgnitePdsIndexingDefragmentationTest.java | 2 +
31 files changed, 1659 insertions(+), 287 deletions(-)
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
index e16acaa..e9f8cd3 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
@@ -92,7 +92,10 @@ public enum CommandList {
METRIC("--metric", new MetricCommand()),
/** */
- PERSISTENCE("--persistence", new PersistenceCommand());
+ PERSISTENCE("--persistence", new PersistenceCommand()),
+
+ /** Command to manage PDS defragmentation. */
+ DEFRAGMENTATION("--defragmentation", new DefragmentationCommand());
/** Private values copy so there's no need in cloning it every time. */
private static final CommandList[] VALUES = CommandList.values();
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
new file mode 100644
index 0000000..c2fa8e9
--- /dev/null
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientNode;
+import org.apache.ignite.internal.commandline.defragmentation.DefragmentationArguments;
+import org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskArg;
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskResult;
+
+import static org.apache.ignite.internal.commandline.Command.usage;
+import static org.apache.ignite.internal.commandline.CommandList.DEFRAGMENTATION;
+import static org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands.CANCEL;
+import static org.apache.ignite.internal.commandline.defragmentation.DefragmentationSubcommands.SCHEDULE;
+
+/** */
+public class DefragmentationCommand implements Command<DefragmentationArguments> {
+ /** */
+ private static final String NODES_ARG = "--nodes";
+
+ /** */
+ private static final String CACHES_ARG = "--caches";
+
+ /** */
+ private DefragmentationArguments args;
+
+ /** {@inheritDoc} */
+ @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception {
+ try (GridClient client = Command.startClient(clientCfg)) {
+ Optional<GridClientNode> firstNodeOpt = client.compute().nodes().stream().filter(GridClientNode::connectable).findFirst();
+
+ if (firstNodeOpt.isPresent()) {
+ VisorDefragmentationTaskResult res;
+
+ if (args.nodeIds() == null) {
+ res = TaskExecutor.executeTaskByNameOnNode(
+ client,
+ VisorDefragmentationTask.class.getName(),
+ convertArguments(),
+ null, // Use node from clientCfg.
+ clientCfg
+ );
+ }
+ else {
+ VisorTaskArgument<?> visorArg = new VisorTaskArgument<>(
+ client.compute().nodes().stream().filter(
+ node -> args.nodeIds().contains(node.consistentId().toString())
+ ).map(GridClientNode::nodeId).collect(Collectors.toList()),
+ convertArguments(),
+ false
+ );
+
+ res = client.compute()
+ .projection(firstNodeOpt.get())
+ .execute(
+ VisorDefragmentationTask.class.getName(),
+ visorArg
+ );
+ }
+
+ printResult(res, log);
+ }
+ else
+ log.warning("No nodes found in topology, command won't be executed.");
+ }
+ catch (Throwable t) {
+ log.severe("Failed to execute defragmentation command='" + args.subcommand().text() + "'");
+ log.severe(CommandLogger.errorMessage(t));
+
+ throw t;
+ }
+
+ return null;
+ }
+
+ /** */
+ private void printResult(VisorDefragmentationTaskResult res, Logger log) {
+ assert res != null;
+
+ log.info(res.getMessage());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void parseArguments(CommandArgIterator argIter) {
+ DefragmentationSubcommands cmd = DefragmentationSubcommands.of(argIter.nextArg("Expected defragmentation subcommand."));
+
+ if (cmd == null || cmd == DefragmentationSubcommands.STATUS) // Status subcommand is not yet completed.
+ throw new IllegalArgumentException("Expected correct defragmentation subcommand.");
+
+ args = new DefragmentationArguments(cmd);
+
+ switch (cmd) {
+ case SCHEDULE:
+ List<String> consistentIds = null;
+ List<String> cacheNames = null;
+
+ String subarg;
+
+ do {
+ subarg = argIter.nextArg("Expected one of subcommand arguments.").toLowerCase(Locale.ENGLISH);
+
+ switch (subarg) {
+ case NODES_ARG: {
+ Set<String> ids = argIter.nextStringSet(NODES_ARG);
+
+ if (ids.isEmpty())
+ throw new IllegalArgumentException("Consistent ids list is empty.");
+
+ consistentIds = new ArrayList<>(ids);
+
+ break;
+ }
+
+ case CACHES_ARG: {
+ Set<String> ids = argIter.nextStringSet(CACHES_ARG);
+
+ if (ids.isEmpty())
+ throw new IllegalArgumentException("Caches list is empty.");
+
+ cacheNames = new ArrayList<>(ids);
+
+ break;
+ }
+
+ default:
+ subarg = null;
+ }
+ }
+ while (subarg != null);
+
+ if (consistentIds == null)
+ throw new IllegalArgumentException("--nodes argument is missing.");
+
+ args.setNodeIds(consistentIds);
+ args.setCacheNames(cacheNames);
+
+ break;
+
+ case STATUS:
+ case CANCEL:
+ // No-op.
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public DefragmentationArguments arg() {
+ return args;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printUsage(Logger log) {
+ String consistentIds = "consistentId0,consistentId1";
+
+ String cacheNames = "cache1,cache2,cache3";
+
+ usage(
+ log,
+ "Schedule PDS defragmentation on given nodes for all caches:",
+ DEFRAGMENTATION,
+ SCHEDULE.text(),
+ NODES_ARG,
+ consistentIds
+ );
+
+ usage(
+ log,
+ "Schedule PDS defragmentation on given nodes but only for given caches:",
+ DEFRAGMENTATION,
+ SCHEDULE.text(),
+ NODES_ARG,
+ consistentIds,
+ CACHES_ARG,
+ cacheNames
+ );
+
+ usage(
+ log,
+ "Cancel scheduled or active PDS defragmentation on underlying node:",
+ DEFRAGMENTATION,
+ CANCEL.text()
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return DEFRAGMENTATION.toCommandName();
+ }
+
+ /** */
+ private VisorDefragmentationTaskArg convertArguments() {
+ return new VisorDefragmentationTaskArg(
+ convertSubcommand(args.subcommand()),
+ args.nodeIds(),
+ args.cacheNames()
+ );
+ }
+
+ /** */
+ private static VisorDefragmentationOperation convertSubcommand(DefragmentationSubcommands subcmd) {
+ switch (subcmd) {
+ case SCHEDULE:
+ return VisorDefragmentationOperation.SCHEDULE;
+
+ case STATUS:
+ return VisorDefragmentationOperation.STATUS;
+
+ case CANCEL:
+ return VisorDefragmentationOperation.CANCEL;
+
+ default:
+ throw new IllegalArgumentException(subcmd.name());
+ }
+ }
+}
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java
new file mode 100644
index 0000000..e82e578
--- /dev/null
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationArguments.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.defragmentation;
+
+import java.util.List;
+
+/** */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class DefragmentationArguments {
+ /** */
+ private final DefragmentationSubcommands subcmd;
+
+ /** */
+ private List<String> nodeIds;
+
+ /** */
+ private List<String> cacheNames;
+
+ /** */
+ public DefragmentationArguments(DefragmentationSubcommands subcmd) {
+ this.subcmd = subcmd;
+ }
+
+ /** */
+ public DefragmentationSubcommands subcommand() {
+ return subcmd;
+ }
+
+ /** */
+ public void setNodeIds(List<String> nodeIds) {
+ this.nodeIds = nodeIds;
+ }
+
+ /** */
+ public List<String> nodeIds() {
+ return nodeIds;
+ }
+
+ /** */
+ public void setCacheNames(List<String> cacheNames) {
+ this.cacheNames = cacheNames;
+ }
+
+ /** */
+ public List<String> cacheNames() {
+ return cacheNames;
+ }
+}
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java
new file mode 100644
index 0000000..86ec775
--- /dev/null
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/defragmentation/DefragmentationSubcommands.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.defragmentation;
+
+import org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public enum DefragmentationSubcommands {
+ /** */
+ SCHEDULE("schedule", VisorDefragmentationOperation.SCHEDULE),
+
+ /** */
+ STATUS("status", VisorDefragmentationOperation.STATUS),
+
+ /** */
+ CANCEL("cancel", VisorDefragmentationOperation.CANCEL);
+
+ /** */
+ private final String name;
+
+ /** */
+ private final VisorDefragmentationOperation visorOperation;
+
+ /** */
+ DefragmentationSubcommands(String name, VisorDefragmentationOperation visorOperation) {
+ this.name = name;
+ this.visorOperation = visorOperation;
+ }
+
+ /**
+ * @param strRep String representation of subcommand.
+ * @return Subcommand for its string representation.
+ */
+ public static @Nullable DefragmentationSubcommands of(String strRep) {
+ for (DefragmentationSubcommands cmd : values()) {
+ if (cmd.text().equalsIgnoreCase(strRep))
+ return cmd;
+ }
+
+ return null;
+ }
+
+ /** */
+ public String text() {
+ return name;
+ }
+
+ /** */
+ public VisorDefragmentationOperation operation() {
+ return visorOperation;
+ }
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
index b3e51cd..b1ab5f8 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
@@ -1033,6 +1033,7 @@ public class CommandHandlerParsingTest {
cmd == CommandList.WARM_UP ||
cmd == CommandList.PROPERTY ||
cmd == CommandList.SYSTEM_VIEW ||
- cmd == CommandList.METRIC;
+ cmd == CommandList.METRIC ||
+ cmd == CommandList.DEFRAGMENTATION;
}
}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index dbccfa5..cac12d4 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.util.GridCommandHandlerBrokenIndexTest;
import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
+import org.apache.ignite.util.GridCommandHandlerDefragmentationTest;
import org.apache.ignite.util.GridCommandHandlerIndexForceRebuildTest;
import org.apache.ignite.util.GridCommandHandlerIndexListTest;
import org.apache.ignite.util.GridCommandHandlerIndexRebuildStatusTest;
@@ -82,6 +83,8 @@ import org.junit.runners.Suite;
GridCommandHandlerPropertiesTest.class,
+ GridCommandHandlerDefragmentationTest.class,
+
SystemViewCommandTest.class,
MetricCommandTest.class
})
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
new file mode 100644
index 0000000..adce9e2
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.UnaryOperator;
+import java.util.logging.Formatter;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.logging.StreamHandler;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.commandline.CommandHandler;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.maintenance.MaintenanceTask;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+
+/** */
+public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClusterPerMethodAbstractTest {
+ /** */
+ private static CountDownLatch blockCdl;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getDataStorageConfiguration().setWalSegmentSize(512 * 1024).setWalSegments(3);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDefragmentationSchedule() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.cluster().state(ACTIVE);
+
+ assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute("--defragmentation", "schedule"));
+
+ String grid0ConsId = grid(0).configuration().getConsistentId().toString();
+ String grid1ConsId = grid(1).configuration().getConsistentId().toString();
+
+ ListeningTestLogger testLog = new ListeningTestLogger();
+
+ CommandHandler cmd = createCommandHandler(testLog);
+
+ LogListener logLsnr = LogListener.matches("Scheduling completed successfully.").build();
+
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--defragmentation",
+ "schedule",
+ "--nodes",
+ grid0ConsId
+ ));
+
+ assertTrue(logLsnr.check());
+
+ MaintenanceTask mntcTask = DefragmentationParameters.toStore(Collections.emptyList());
+
+ assertNotNull(grid(0).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
+ assertNull(grid(1).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
+
+ stopGrid(0);
+ startGrid(0);
+
+ logLsnr = LogListener.matches("Node is already in Maintenance Mode").build();
+
+ testLog.clearListeners();
+
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--defragmentation",
+ "schedule",
+ "--nodes",
+ grid0ConsId
+ ));
+
+ assertTrue(logLsnr.check());
+
+ stopGrid(0);
+ startGrid(0);
+
+ stopGrid(1);
+ startGrid(1);
+
+ stopAllGrids();
+
+ startGrids(2);
+
+ logLsnr = LogListener.matches("Scheduling completed successfully.").times(2).build();
+
+ testLog.clearListeners();
+
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--defragmentation",
+ "schedule",
+ "--nodes",
+ String.join(",", grid0ConsId, grid1ConsId)
+ ));
+
+ assertTrue(logLsnr.check());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDefragmentationCancel() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.cluster().state(ACTIVE);
+
+ String grid0ConsId = grid(0).configuration().getConsistentId().toString();
+
+ ListeningTestLogger testLog = new ListeningTestLogger();
+
+ CommandHandler cmd = createCommandHandler(testLog);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--defragmentation",
+ "schedule",
+ "--nodes",
+ grid0ConsId
+ ));
+
+ LogListener logLsnr = LogListener.matches("Scheduled defragmentation task cancelled successfully.").atLeast(1).build();
+
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--port",
+ grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(),
+ "--defragmentation",
+ "cancel"
+ ));
+
+ assertTrue(logLsnr.check());
+
+ testLog.clearListeners();
+
+ logLsnr = LogListener.matches("Scheduled defragmentation task is not found.").build();
+
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--port",
+ grid(1).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(),
+ "--defragmentation",
+ "cancel"
+ ));
+
+ assertTrue(logLsnr.check());
+ }
+
+ /** */
+ @Test
+ public void testDefragmentationCancelInProgress() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, i);
+
+ forceCheckpoint(ig);
+
+ String grid0ConsId = ig.configuration().getConsistentId().toString();
+
+ ListeningTestLogger testLog = new ListeningTestLogger();
+
+ CommandHandler cmd = createCommandHandler(testLog);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--defragmentation",
+ "schedule",
+ "--nodes",
+ grid0ConsId
+ ));
+
+ String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString();
+
+ stopGrid(0);
+
+ blockCdl = new CountDownLatch(128);
+
+ UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
+ DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+
+ FileIOFactory delegate = dsCfg.getFileIOFactory();
+
+ dsCfg.setFileIOFactory((file, modes) -> {
+ if (file.getName().contains("dfrg")) {
+ if (blockCdl.getCount() == 0) {
+ try {
+ // Slow down defragmentation process.
+ // This'll be enough for the test since we have, like, 900 partitions left.
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+ }
+ else
+ blockCdl.countDown();
+ }
+
+ return delegate.create(file, modes);
+ });
+
+ return cfg;
+ };
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try {
+ startGrid(0, cfgOp);
+ }
+ catch (Exception e) {
+ // No-op.
+ throw new RuntimeException(e);
+ }
+ });
+
+ blockCdl.await();
+
+ LogListener logLsnr = LogListener.matches("Defragmentation cancelled successfully.").build();
+
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--port",
+ port,
+ "--defragmentation",
+ "cancel"
+ ));
+
+ assertTrue(logLsnr.check());
+
+ fut.get();
+
+ testLog.clearListeners();
+
+ logLsnr = LogListener.matches("Defragmentation is already completed or has been cancelled previously.").build();
+
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--port",
+ port,
+ "--defragmentation",
+ "cancel"
+ ));
+
+ assertTrue(logLsnr.check());
+ }
+
+ /** */
+ private CommandHandler createCommandHandler(ListeningTestLogger testLog) {
+ Logger log = CommandHandler.initLogger(null);
+
+ log.addHandler(new StreamHandler(System.out, new Formatter() {
+ /** {@inheritDoc} */
+ @Override public String format(LogRecord record) {
+ String msg = record.getMessage();
+
+ testLog.info(msg);
+
+ return msg + "\n";
+ }
+ }));
+
+ return new CommandHandler(log);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
index 063bd47..5328d13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
@@ -61,6 +61,9 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
/** */
private final boolean inMemoryMode;
+ /** */
+ private volatile boolean maintenanceMode;
+
/**
* @param ctx Kernal context.
*/
@@ -99,7 +102,7 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
log.info(
"Maintenance Task with name " + task.name() +
" is already registered" +
- oldTask.parameters() != null ? " with parameters " + oldTask.parameters() : "" + "." +
+ (oldTask.parameters() != null ? " with parameters " + oldTask.parameters() : ".") +
" It will be replaced with new task" +
task.parameters() != null ? " with parameters " + task.parameters() : "" + "."
);
@@ -134,6 +137,8 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
fileStorage.init();
activeTasks.putAll(fileStorage.getAllTasks());
+
+ maintenanceMode = !activeTasks.isEmpty();
}
catch (Throwable t) {
log.warning("Caught exception when starting MaintenanceProcessor," +
@@ -213,18 +218,20 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
/** {@inheritDoc} */
@Override public boolean isMaintenanceMode() {
- return !activeTasks.isEmpty();
+ return maintenanceMode;
}
/** {@inheritDoc} */
- @Override public void unregisterMaintenanceTask(String maintenanceTaskName) {
+ @Override public boolean unregisterMaintenanceTask(String maintenanceTaskName) {
if (inMemoryMode)
- return;
+ return false;
+
+ boolean deleted;
if (isMaintenanceMode())
- activeTasks.remove(maintenanceTaskName);
+ deleted = activeTasks.remove(maintenanceTaskName) != null;
else
- requestedTasks.remove(maintenanceTaskName);
+ deleted = requestedTasks.remove(maintenanceTaskName) != null;
try {
fileStorage.deleteMaintenanceTask(maintenanceTaskName);
@@ -237,6 +244,8 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
fileStorage.clear();
}
+
+ return deleted;
}
/** {@inheritDoc} */
@@ -247,14 +256,14 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
List<MaintenanceAction<?>> actions = cb.allActions();
if (actions == null || actions.isEmpty())
- throw new IgniteException("Maintenance workflow callback should provide at least one mainetance action");
+ throw new IgniteException("Maintenance workflow callback should provide at least one maintenance action");
int size = actions.size();
- long distinctSize = actions.stream().map(a -> a.name()).distinct().count();
+ long distinctSize = actions.stream().map(MaintenanceAction::name).distinct().count();
if (distinctSize < size)
throw new IgniteException("All actions of a single workflow should have unique names: " +
- actions.stream().map(a -> a.name()).collect(Collectors.joining(", ")));
+ actions.stream().map(MaintenanceAction::name).collect(Collectors.joining(", ")));
Optional<String> wrongActionName = actions
.stream()
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 2c366eb..1484e30 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -748,7 +748,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** */
- private void prepareCacheDefragmentation(List<Integer> cacheGroupIds) throws IgniteCheckedException {
+ private void prepareCacheDefragmentation(List<String> cacheNames) throws IgniteCheckedException {
GridKernalContext kernalCtx = cctx.kernalContext();
DataStorageConfiguration dsCfg = kernalCtx.config().getDataStorageConfiguration();
@@ -778,7 +778,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
lightCheckpointMgr.start();
defrgMgr = new CachePartitionDefragmentationManager(
- cacheGroupIds,
+ cacheNames,
cctx,
this,
(FilePageStoreManager)cctx.pageStore(),
@@ -788,6 +788,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
);
}
+ /** */
+ public CachePartitionDefragmentationManager defragmentationManager() {
+ return defrgMgr;
+ }
+
/** {@inheritDoc} */
@Override public DataRegion addDataRegion(DataStorageConfiguration dataStorageCfg, DataRegionConfiguration dataRegionCfg,
boolean trackable, PageReadWriteManager pmPageMgr) throws IgniteCheckedException {
@@ -826,9 +831,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
.registerWorkflowCallbackIfTaskExists(
DEFRAGMENTATION_MNTC_TASK_NAME,
task -> {
- prepareCacheDefragmentation(fromStore(task).cacheGroupIds());
+ prepareCacheDefragmentation(fromStore(task).cacheNames());
- return new DefragmentationWorkflowCallback(cctx.kernalContext()::log, defrgMgr);
+ return new DefragmentationWorkflowCallback(
+ cctx.kernalContext()::log,
+ defrgMgr,
+ cctx.kernalContext().failure()
+ );
}
);
}
@@ -1097,6 +1106,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
+ if (defrgMgr != null)
+ defrgMgr.cancel();
+
checkpointManager.stop(cancel);
super.onKernalStop0(cancel);
@@ -1395,6 +1407,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture exchangeFut) {
+ if (defrgMgr != null)
+ return;
+
rebuildIndexes(cctx.cacheContexts(), (cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()));
}
@@ -1887,6 +1902,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cctx.wal().resumeLogging(walTail);
}
+ /** */
+ public void preserveWalTailPointer() throws IgniteCheckedException {
+ walTail = cctx.wal().flush(null, true);
+ }
+
/**
* @param grpId Cache group id.
* @param partId Partition ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index 006fa8e..41999fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
@@ -102,7 +103,7 @@ public class CachePartitionDefragmentationManager {
public static final String DEFRAGMENTATION_MNTC_TASK_NAME = "defragmentationMaintenanceTask";
/** */
- private final Set<Integer> cacheGroupsForDefragmentation;
+ private final Set<String> cachesForDefragmentation;
/** Cache shared context. */
private final GridCacheSharedContext<?, ?> sharedCtx;
@@ -137,8 +138,14 @@ public class CachePartitionDefragmentationManager {
/** */
private final DataRegion mappingDataRegion;
+ /** */
+ private final AtomicBoolean cancel = new AtomicBoolean();
+
+ /** */
+ private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();
+
/**
- * @param cacheGrpIds
+ * @param cacheNames Names of caches to be defragmented. Empty means "all".
* @param sharedCtx Cache shared context.
* @param dbMgr Database manager.
* @param filePageStoreMgr File page store manager.
@@ -147,7 +154,7 @@ public class CachePartitionDefragmentationManager {
* @param pageSize Page size.
*/
public CachePartitionDefragmentationManager(
- List<Integer> cacheGrpIds,
+ List<String> cacheNames,
GridCacheSharedContext<?, ?> sharedCtx,
GridCacheDatabaseSharedManager dbMgr,
FilePageStoreManager filePageStoreMgr,
@@ -155,7 +162,7 @@ public class CachePartitionDefragmentationManager {
LightweightCheckpointManager defragmentationCheckpoint,
int pageSize
) throws IgniteCheckedException {
- cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds);
+ cachesForDefragmentation = new HashSet<>(cacheNames);
this.dbMgr = dbMgr;
this.filePageStoreMgr = filePageStoreMgr;
@@ -172,19 +179,24 @@ public class CachePartitionDefragmentationManager {
}
/** */
- public void executeDefragmentation() throws IgniteCheckedException {
- log.info("Defragmentation started.");
+ public void beforeDefragmentation() throws IgniteCheckedException {
+ // Checkpointer must be enabled so all pages on disk are in their latest valid state.
+ dbMgr.resumeWalLogging();
- try {
- // Checkpointer must be enabled so all pages on disk are in their latest valid state.
- dbMgr.resumeWalLogging();
+ dbMgr.onStateRestored(null);
- dbMgr.onStateRestored(null);
+ nodeCheckpoint.forceCheckpoint("beforeDefragmentation", null).futureFor(FINISHED).get();
- nodeCheckpoint.forceCheckpoint("beforeDefragmentation", null).futureFor(FINISHED).get();
+ dbMgr.preserveWalTailPointer();
- sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+ sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+ }
+
+ /** */
+ public void executeDefragmentation() throws IgniteCheckedException {
+ log.info("Defragmentation started.");
+ try {
// Now the actual process starts.
TreeIterator treeIter = new TreeIterator(pageSize);
@@ -197,241 +209,272 @@ public class CachePartitionDefragmentationManager {
int grpId = oldGrpCtx.groupId();
- if (!cacheGroupsForDefragmentation.isEmpty() && !cacheGroupsForDefragmentation.contains(grpId))
- continue;
+ if (!cachesForDefragmentation.isEmpty()) {
+ if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name())))
+ continue;
+ }
File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName());
- if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
- continue;
+ try {
+ if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
+ continue;
- GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
+ GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
- List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
- .filter(store -> {
- try {
- return filePageStoreMgr.exists(grpId, store.partId());
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
+ .filter(store -> {
+ try {
+ return filePageStoreMgr.exists(grpId, store.partId());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ if (workDir != null && !oldCacheDataStores.isEmpty()) {
+ // We can't start defragmentation of new group on the region that has wrong eviction mode.
+ // So waiting of the previous cache group defragmentation is inevitable.
+ DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
+
+ if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
+ prevPageEvictionMode = curPageEvictionMode;
+
+ partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+
+ if (idxDfrgFut != null)
+ idxDfrgFut.get();
}
- })
- .collect(Collectors.toList());
- if (workDir != null && !oldCacheDataStores.isEmpty()) {
- // We can't start defragmentation of new group on the region that has wrong eviction mode.
- // So waiting of the previous cache group defragmentation is inevitable.
- DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
+ IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
- if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
- prevPageEvictionMode = curPageEvictionMode;
+ for (CacheDataStore store : offheap.cacheDataStores()) {
+ // Tree can be null for not yet initialized partitions.
+ // This would mean that these partitions are empty.
+ assert store.tree() == null || store.tree().groupId() == grpId;
- partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+ if (store.tree() != null)
+ cacheDataStores.put(store.partId(), store);
+ }
- if (idxDfrgFut != null)
- idxDfrgFut.get();
- }
+ dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
- IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
+ // Another cheat. Ttl cleanup manager knows too much shit.
+ oldGrpCtx.caches().stream()
+ .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+ .forEach(cacheCtx -> cacheCtx.ttl().unregister());
- for (CacheDataStore store : offheap.cacheDataStores()) {
- // Tree can be null for not yet initialized partitions.
- // This would mean that these partitions are empty.
- assert store.tree() == null || store.tree().groupId() == grpId;
+ // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care
+ // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
+ // cache group explicitly.
+ oldGrpCtx.localWalEnabled(false, false);
- if (store.tree() != null)
- cacheDataStores.put(store.partId(), store);
- }
+ boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
- dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+ FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
- // Another cheat. Ttl cleanup manager knows too much shit.
- oldGrpCtx.caches().stream()
- .filter(cacheCtx -> cacheCtx.groupId() == grpId)
- .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+ createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> {
+ }); //TODO Allocated tracker.
- // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care and
- // WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
- // cache group explicitly.
- oldGrpCtx.localWalEnabled(false, false);
+ checkCancellation();
- boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
+ GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
- FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+ PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
- createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> {
- }); //TODO Allocated tracker.
+ CacheGroupContext newGrpCtx = new CacheGroupContext(
+ sharedCtx,
+ grpId,
+ oldGrpCtx.receivedFrom(),
+ CacheType.USER,
+ oldGrpCtx.config(),
+ oldGrpCtx.affinityNode(),
+ partDataRegion,
+ oldGrpCtx.cacheObjectContext(),
+ null,
+ null,
+ oldGrpCtx.localStartVersion(),
+ true,
+ false,
+ true
+ );
- GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
- PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
+ try {
+ // This will initialize partition meta in index partition - meta tree and reuse list.
+ newGrpCtx.start();
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
- CacheGroupContext newGrpCtx = new CacheGroupContext(
- sharedCtx,
- grpId,
- oldGrpCtx.receivedFrom(),
- CacheType.USER,
- oldGrpCtx.config(),
- oldGrpCtx.affinityNode(),
- partDataRegion,
- oldGrpCtx.cacheObjectContext(),
- null,
- null,
- oldGrpCtx.localStartVersion(),
- true,
- false,
- true
- );
+ IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
- defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+ for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
+ checkCancellation();
- try {
- // This will initialize partition meta in index partition - meta tree and reuse list.
- newGrpCtx.start();
- }
- finally {
- defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
- }
+ int partId = oldCacheDataStore.partId();
- IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
+ PartitionContext partCtx = new PartitionContext(
+ workDir,
+ grpId,
+ partId,
+ partDataRegion,
+ mappingDataRegion,
+ oldGrpCtx,
+ newGrpCtx,
+ cacheDataStores.get(partId),
+ pageStoreFactory
+ );
- for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
- int partId = oldCacheDataStore.partId();
+ if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
+ partCtx.createPageStore(
+ () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+ partCtx.mappingPagesAllocated,
+ partCtx.mappingPageMemory
+ );
- PartitionContext partCtx = new PartitionContext(
- workDir,
- grpId,
- partId,
- partDataRegion,
- mappingDataRegion,
- oldGrpCtx,
- newGrpCtx,
- cacheDataStores.get(partId),
- pageStoreFactory
- );
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+
+ continue;
+ }
- if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
partCtx.createPageStore(
() -> defragmentedPartMappingFile(workDir, partId).toPath(),
partCtx.mappingPagesAllocated,
partCtx.mappingPageMemory
);
- linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
- continue;
- }
-
- partCtx.createPageStore(
- () -> defragmentedPartMappingFile(workDir, partId).toPath(),
- partCtx.mappingPagesAllocated,
- partCtx.mappingPageMemory
- );
+ checkCancellation();
- linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+ partCtx.createPageStore(
+ () -> defragmentedPartTmpFile(workDir, partId).toPath(),
+ partCtx.partPagesAllocated,
+ partCtx.partPageMemory
+ );
- partCtx.createPageStore(
- () -> defragmentedPartTmpFile(workDir, partId).toPath(),
- partCtx.partPagesAllocated,
- partCtx.partPageMemory
- );
+ partCtx.createNewCacheDataStore(offheap);
- partCtx.createNewCacheDataStore(offheap);
+ copyPartitionData(partCtx, treeIter);
- copyPartitionData(partCtx, treeIter);
+ IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
+ if (fut.error() != null)
+ return;
- IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
- if (fut.error() != null)
- return;
+ PageStore oldPageStore = null;
- PageStore oldPageStore = null;
+ try {
+ oldPageStore = filePageStoreMgr.getStore(grpId, partId);
+ }
+ catch (IgniteCheckedException ignore) {
+ }
- try {
- oldPageStore = filePageStoreMgr.getStore(grpId, partId);
- }
- catch (IgniteCheckedException ignore) {
- }
+ assert oldPageStore != null;
- if (log.isDebugEnabled()) {
- log.debug(S.toString(
- "Partition defragmented",
- "grpId", grpId, false,
- "partId", partId, false,
- "oldPages", oldPageStore.pages(), false,
- "newPages", partCtx.partPagesAllocated.get() + 1, false,
- "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
- "pageSize", pageSize, false,
- "partFile", defragmentedPartFile(workDir, partId).getName(), false,
- "workDir", workDir, false
- ));
- }
+ if (log.isDebugEnabled()) {
+ log.debug(S.toString(
+ "Partition defragmented",
+ "grpId", grpId, false,
+ "partId", partId, false,
+ "oldPages", oldPageStore.pages(), false,
+ "newPages", partCtx.partPagesAllocated.get() + 1, false,
+ "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
+ "pageSize", pageSize, false,
+ "partFile", defragmentedPartFile(workDir, partId).getName(), false,
+ "workDir", workDir, false
+ ));
+ }
- oldPageMem.invalidate(grpId, partId);
+ oldPageMem.invalidate(grpId, partId);
- partCtx.partPageMemory.invalidate(grpId, partId);
+ partCtx.partPageMemory.invalidate(grpId, partId);
- DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
- pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
+ pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
- renameTempPartitionFile(workDir, partId);
- };
+ renameTempPartitionFile(workDir, partId);
+ };
- GridFutureAdapter<?> cpFut = defragmentationCheckpoint
- .forceCheckpoint("partition defragmented", null)
- .futureFor(CheckpointState.FINISHED);
+ GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+ .forceCheckpoint("partition defragmented", null)
+ .futureFor(CheckpointState.FINISHED);
- cpFut.listen(cpLsnr);
+ cpFut.listen(cpLsnr);
- cmpFut.add((IgniteInternalFuture<Object>)cpFut);
- }
+ cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+ }
- // A bit too general for now, but I like it more then saving only the last checkpoint future.
- cmpFut.markInitialized().get();
+ // A bit too general for now, but I like it more then saving only the last checkpoint future.
+ cmpFut.markInitialized().get();
- idxDfrgFut = new GridFinishedFuture<>();
+ idxDfrgFut = new GridFinishedFuture<>();
- if (filePageStoreMgr.hasIndexStore(grpId)) {
- defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
+ if (filePageStoreMgr.hasIndexStore(grpId)) {
+ defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
- idxDfrgFut = defragmentationCheckpoint
- .forceCheckpoint("index defragmented", null)
- .futureFor(CheckpointState.FINISHED);
- }
+ idxDfrgFut = defragmentationCheckpoint
+ .forceCheckpoint("index defragmented", null)
+ .futureFor(CheckpointState.FINISHED);
+ }
- idxDfrgFut.listen(fut -> {
- oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+ idxDfrgFut = idxDfrgFut.chain(fut -> {
+ oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
- PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
+ PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
- partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+ partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
- DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
- pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
+ pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
- PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
+ PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
- pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
+ pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
- pageMgr.pageStoreMap().clear(grpId);
+ pageMgr.pageStoreMap().clear(grpId);
- renameTempIndexFile(workDir);
+ renameTempIndexFile(workDir);
- writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
+ writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
- batchRenameDefragmentedCacheGroupPartitions(workDir, log);
- });
+ batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+ return null;
+ });
+ }
}
+ catch (DefragmentationCancelledException e) {
+ DefragmentationFileUtils.deleteLeftovers(workDir);
- // I guess we should wait for it?
- if (idxDfrgFut != null)
- idxDfrgFut.get();
+ throw e;
+ }
}
+ if (idxDfrgFut != null)
+ idxDfrgFut.get();
+
mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
log.info("Defragmentation completed. All partitions are defragmented.");
+
+ completionFut.onDone();
+ }
+ catch (DefragmentationCancelledException e) {
+ mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+ log.info("Defragmentation has been cancelled.");
+
+ completionFut.onDone();
+ }
+ catch (Throwable t) {
+ completionFut.onDone(t);
+
+ throw t;
}
finally {
defragmentationCheckpoint.stop(true);
@@ -439,6 +482,11 @@ public class CachePartitionDefragmentationManager {
}
/** */
+ public IgniteInternalFuture<?> completionFuture() {
+ return completionFut.chain(future -> null);
+ }
+
+ /** */
public void createIndexPageStore(
int grpId,
File workDir,
@@ -476,6 +524,39 @@ public class CachePartitionDefragmentationManager {
}
/**
+ * Cancel the process of defragmentation.
+ *
+ * @return {@code true} if process was cancelled by this method.
+ */
+ public boolean cancel() {
+ if (completionFut.isDone())
+ return false;
+
+ if (cancel.compareAndSet(false, true)) {
+ try {
+ completionFut.get();
+ }
+ catch (Throwable ignore) {
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** */
+ private void checkCancellation() throws DefragmentationCancelledException {
+ if (cancel.get())
+ throw new DefragmentationCancelledException();
+ }
+
+ /** */
+ public String status() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /**
* Defragmentate partition.
*
* @param partCtx
@@ -501,6 +582,8 @@ public class CachePartitionDefragmentationManager {
AtomicInteger entriesProcessed = new AtomicInteger();
treeIter.iterate(tree, partCtx.cachePageMemory, (tree0, io, pageAddr, idx) -> {
+ checkCancellation();
+
if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
@@ -541,6 +624,8 @@ public class CachePartitionDefragmentationManager {
return true;
});
+ checkCancellation();
+
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
@@ -673,12 +758,15 @@ public class CachePartitionDefragmentationManager {
CheckpointTimeoutLock cpLock = defragmentationCheckpoint.checkpointTimeoutLock();
+ Runnable cancellationChecker = this::checkCancellation;
+
idx.defragment(
grpCtx,
newCtx,
(PageMemoryEx)partDataRegion.pageMemory(),
mappingByPartition,
- cpLock
+ cpLock,
+ cancellationChecker
);
}
@@ -824,4 +912,10 @@ public class CachePartitionDefragmentationManager {
this.newCacheDataStore = newCacheDataStore;
}
}
+
+ /** */
+ private static class DefragmentationCancelledException extends RuntimeException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
index b4273cd..214e17d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
@@ -81,16 +81,7 @@ public class DefragmentationFileUtils {
U.delete(defragmentationCompletionMarkerFile(workDir));
- for (File file : workDir.listFiles()) {
- String fileName = file.getName();
-
- if (
- fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
- || fileName.startsWith(DFRG_INDEX_FILE_NAME)
- || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
- )
- U.delete(file);
- }
+ deleteLeftovers(workDir);
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -98,6 +89,24 @@ public class DefragmentationFileUtils {
}
/**
+ * Deletes all defragmentation related file from work directory, except for completion marker.
+ *
+ * @param workDir Cache group working directory.
+ */
+ public static void deleteLeftovers(File workDir) {
+ for (File file : workDir.listFiles()) {
+ String fileName = file.getName();
+
+ if (
+ fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+ || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+ || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+ )
+ U.delete(file);
+ }
+ }
+
+ /**
* Checks whether cache group defragmentation completed or not. Completes it if all that's left is renaming.
*
* @param workDir Cache group working directory.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
index 6bc3ddc..499e247 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.ignite.maintenance.MaintenanceTask;
@@ -29,31 +30,31 @@ import static org.apache.ignite.internal.processors.cache.persistence.defragment
*/
public class DefragmentationParameters {
/** */
- public static final String CACHE_GROUP_ID_SEPARATOR = ",";
+ public static final String SEPARATOR = "/";
/** */
- private final List<Integer> cacheGrpIds;
+ private final List<String> cacheNames;
/**
- * @param cacheGrpIds Id of cache group for defragmentations.
+ * @param cacheNames Names of caches for defragmentations.
*/
- private DefragmentationParameters(List<Integer> cacheGrpIds) {
- this.cacheGrpIds = cacheGrpIds;
+ private DefragmentationParameters(List<String> cacheNames) {
+ this.cacheNames = cacheNames;
}
/**
* Convert parameter to maintenance storage.
*
- * @param cacheGroupIds Cache group ids for defragmentation.
+ * @param cacheNames Names of caches for defragmentations.
* @return Maintenance task.
*/
- public static MaintenanceTask toStore(List<Integer> cacheGroupIds) {
+ public static MaintenanceTask toStore(List<String> cacheNames) {
return new MaintenanceTask(
DEFRAGMENTATION_MNTC_TASK_NAME,
- "Cache group defragmentation",
- cacheGroupIds.stream()
+ "Caches defragmentation",
+ cacheNames.stream()
.map(String::valueOf)
- .collect(Collectors.joining(CACHE_GROUP_ID_SEPARATOR))
+ .collect(Collectors.joining(SEPARATOR))
);
}
@@ -62,17 +63,20 @@ public class DefragmentationParameters {
* @return Defragmentation parameters.
*/
public static DefragmentationParameters fromStore(MaintenanceTask rawTask) {
+ if (rawTask.parameters() == null)
+ return new DefragmentationParameters(Collections.emptyList());
+
return new DefragmentationParameters(Arrays.stream(rawTask.parameters()
- .split(CACHE_GROUP_ID_SEPARATOR))
- .map(Integer::valueOf)
+ .split(SEPARATOR))
.collect(Collectors.toList())
);
}
/**
- * @return Cache groups ids.
+ * @return Cache names.
*/
- public List<Integer> cacheGroupIds() {
- return cacheGrpIds;
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public List<String> cacheNames() {
+ return cacheNames;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
index a809579..b3bbe51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.function.Function;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.maintenance.MaintenanceAction;
import org.apache.ignite.maintenance.MaintenanceWorkflowCallback;
import org.jetbrains.annotations.NotNull;
@@ -37,16 +38,22 @@ public class DefragmentationWorkflowCallback implements MaintenanceWorkflowCallb
/** Logger provider. */
private final Function<Class<?>, IgniteLogger> logProvider;
+ /** Failure processor. */
+ private final FailureProcessor failureProc;
+
/**
* @param logProvider Logger provider.
* @param defrgMgr Defragmentation manager.
+ * @param failureProc Failure processor.
*/
public DefragmentationWorkflowCallback(
Function<Class<?>, IgniteLogger> logProvider,
- CachePartitionDefragmentationManager defrgMgr
+ CachePartitionDefragmentationManager defrgMgr,
+ FailureProcessor failureProc
) {
this.defrgMgr = defrgMgr;
this.logProvider = logProvider;
+ this.failureProc = failureProc;
}
/** {@inheritDoc} */
@@ -56,11 +63,11 @@ public class DefragmentationWorkflowCallback implements MaintenanceWorkflowCallb
/** {@inheritDoc} */
@Override public @NotNull List<MaintenanceAction<?>> allActions() {
- return Collections.singletonList(automaticAction());
+ return Collections.singletonList(new StopDefragmentationAction(defrgMgr));
}
/** {@inheritDoc} */
@Override public @Nullable MaintenanceAction<Boolean> automaticAction() {
- return new ExecuteDefragmentationAction(logProvider, defrgMgr);
+ return new ExecuteDefragmentationAction(logProvider, defrgMgr, failureProc);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
index 42b2de7..0758772 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
@@ -21,7 +21,10 @@ import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.maintenance.MaintenanceAction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -36,29 +39,53 @@ class ExecuteDefragmentationAction implements MaintenanceAction<Boolean> {
/** Defragmentation manager. */
private final CachePartitionDefragmentationManager defrgMgr;
+ /** Failure processor. */
+ private final FailureProcessor failureProc;
+
/**
* @param logFunction Logger provider.
* @param defrgMgr Defragmentation manager.
+ * @param failureProc Failure processor.
*/
public ExecuteDefragmentationAction(
Function<Class<?>, IgniteLogger> logFunction,
- CachePartitionDefragmentationManager defrgMgr
+ CachePartitionDefragmentationManager defrgMgr,
+ FailureProcessor failureProc
) {
this.log = logFunction.apply(ExecuteDefragmentationAction.class);
this.defrgMgr = defrgMgr;
+ this.failureProc = failureProc;
}
/** {@inheritDoc} */
@Override public Boolean execute() {
try {
- defrgMgr.executeDefragmentation();
+ defrgMgr.beforeDefragmentation();
}
catch (IgniteCheckedException | IgniteException e) {
- log.error("Defragmentation is failed", e);
+ log.error("Checkpoint before defragmentation failed", e);
return false;
}
+ Thread defrgThread = new Thread(() -> {
+ try {
+ defrgMgr.executeDefragmentation();
+ }
+ catch (Throwable e) {
+ log.error("Defragmentation failed", e);
+
+ // TODO Check other options.
+ failureProc.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+ });
+
+ defrgThread.setName("defragmentation-thread");
+
+ defrgThread.setDaemon(true);
+
+ defrgThread.start();
+
return true;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java
index 42b2de7..4b40b9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/StopDefragmentationAction.java
@@ -17,58 +17,37 @@
package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance;
-import java.util.function.Function;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
import org.apache.ignite.maintenance.MaintenanceAction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Action which allows to start the defragmentation process.
+ * Action which allows to stop the defragmentation at any time from maintenance mode processor.
*/
-class ExecuteDefragmentationAction implements MaintenanceAction<Boolean> {
- /** Logger. */
- private final IgniteLogger log;
-
+class StopDefragmentationAction implements MaintenanceAction<Boolean> {
/** Defragmentation manager. */
- private final CachePartitionDefragmentationManager defrgMgr;
+ private final CachePartitionDefragmentationManager defragmentationMgr;
/**
- * @param logFunction Logger provider.
- * @param defrgMgr Defragmentation manager.
+ * @param defragmentationMgr Defragmentation manager.
*/
- public ExecuteDefragmentationAction(
- Function<Class<?>, IgniteLogger> logFunction,
- CachePartitionDefragmentationManager defrgMgr
- ) {
- this.log = logFunction.apply(ExecuteDefragmentationAction.class);
- this.defrgMgr = defrgMgr;
+ public StopDefragmentationAction(CachePartitionDefragmentationManager defragmentationMgr) {
+ this.defragmentationMgr = defragmentationMgr;
}
/** {@inheritDoc} */
@Override public Boolean execute() {
- try {
- defrgMgr.executeDefragmentation();
- }
- catch (IgniteCheckedException | IgniteException e) {
- log.error("Defragmentation is failed", e);
-
- return false;
- }
-
- return true;
+ return defragmentationMgr.cancel();
}
/** {@inheritDoc} */
@Override public @NotNull String name() {
- return "execute";
+ return "stop";
}
/** {@inheritDoc} */
@Override public @Nullable String description() {
- return "Starting the process of defragmentation.";
+ return "Stopping the defragmentation process immediately";
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 236de0c..117fdeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -501,6 +501,7 @@ public interface GridQueryIndexing {
* @param partPageMem Partition page memory.
* @param mappingByPart Mapping page memory.
* @param cpLock Defragmentation checkpoint read lock.
+ * @param cancellationChecker Cancellation checker.
*
* @throws IgniteCheckedException If failed.
*/
@@ -509,6 +510,7 @@ public interface GridQueryIndexing {
CacheGroupContext newCtx,
PageMemoryEx partPageMem,
IntMap<LinkMap> mappingByPart,
- CheckpointTimeoutLock cpLock
+ CheckpointTimeoutLock cpLock,
+ Runnable cancellationChecker
) throws IgniteCheckedException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b3b644d..bda05be 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -5803,10 +5803,24 @@ public abstract class IgniteUtils {
* @param e Enum value to write, possibly {@code null}.
* @throws IOException If write failed.
*/
- public static <E extends Enum> void writeEnum(DataOutput out, E e) throws IOException {
+ public static <E extends Enum<E>> void writeEnum(DataOutput out, E e) throws IOException {
out.writeByte(e == null ? -1 : e.ordinal());
}
+ /** */
+ public static <E extends Enum<E>> E readEnum(DataInput in, Class<E> enumCls) throws IOException {
+ byte ordinal = in.readByte();
+
+ if (ordinal == (byte)-1)
+ return null;
+
+ int idx = ordinal & 0xFF;
+
+ E[] values = enumCls.getEnumConstants();
+
+ return idx < values.length ? values[idx] : null;
+ }
+
/**
* Gets collection value by index.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java
new file mode 100644
index 0000000..9cc5eab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationOperation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+/** */
+public enum VisorDefragmentationOperation {
+ /** */
+ SCHEDULE,
+ /** */
+ STATUS,
+ /** */
+ CANCEL
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java
new file mode 100644
index 0000000..14cea62
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.processors.task.GridVisorManagementTask;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.maintenance.MaintenanceTask;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore;
+
+/** */
+@GridInternal
+@GridVisorManagementTask
+public class VisorDefragmentationTask extends VisorMultiNodeTask
+ <VisorDefragmentationTaskArg, VisorDefragmentationTaskResult, VisorDefragmentationTaskResult>
+{
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorJob<VisorDefragmentationTaskArg, VisorDefragmentationTaskResult> job(
+ VisorDefragmentationTaskArg arg
+ ) {
+ return new VisorDefragmentationJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected VisorDefragmentationTaskResult reduce0(List<ComputeJobResult> results) {
+ if (taskArg.operation() == VisorDefragmentationOperation.SCHEDULE) {
+ StringBuilder msg = new StringBuilder();
+
+ for (ComputeJobResult res : results) {
+ msg.append(res.getNode().consistentId()).append(":\n");
+
+ if (res.getData() == null)
+ msg.append(" err=").append(res.getException()).append('\n');
+ else {
+ VisorDefragmentationTaskResult data = res.getData();
+
+ msg.append(" success=").append(data.isSuccess()).append('\n');
+ msg.append(" msg=").append(data.getMessage()).append('\n');
+ }
+ }
+
+ return new VisorDefragmentationTaskResult(true, msg.toString());
+ }
+
+ assert results.size() == 1;
+
+ ComputeJobResult res = results.get(0);
+
+ if (res.getException() == null)
+ return res.getData();
+
+ throw res.getException();
+ }
+
+ /** */
+ private static class VisorDefragmentationJob extends VisorJob<VisorDefragmentationTaskArg, VisorDefragmentationTaskResult> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create job with specified argument.
+ *
+ * @param arg Job argument.
+ * @param debug Flag indicating whether debug information should be printed into node log.
+ */
+ protected VisorDefragmentationJob(@Nullable VisorDefragmentationTaskArg arg, boolean debug) {
+ super(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected VisorDefragmentationTaskResult run(
+ @Nullable VisorDefragmentationTaskArg arg
+ ) throws IgniteException {
+ switch (arg.operation()) {
+ case SCHEDULE:
+ return runSchedule(arg);
+
+ case STATUS:
+ return runStatus(arg);
+
+ case CANCEL:
+ return runCancel(arg);
+ }
+
+ throw new IllegalArgumentException("Operation: " + arg.operation());
+ }
+
+ /** */
+ private VisorDefragmentationTaskResult runSchedule(VisorDefragmentationTaskArg arg) {
+ MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
+
+ MaintenanceTask oldTask;
+
+ try {
+ List<String> cacheNames = arg.cacheNames();
+
+ oldTask = mntcReg.registerMaintenanceTask(toStore(cacheNames == null ? Collections.emptyList() : cacheNames));
+ }
+ catch (IgniteCheckedException e) {
+ return new VisorDefragmentationTaskResult(false, "Scheduling failed: " + e.getMessage());
+ }
+
+ return new VisorDefragmentationTaskResult(
+ true,
+ "Scheduling completed successfully." +
+ (oldTask == null ? "" : " Previously scheduled task has been removed.")
+ );
+ }
+
+ /** */
+ private VisorDefragmentationTaskResult runStatus(VisorDefragmentationTaskArg arg) {
+ MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
+
+ if (!mntcReg.isMaintenanceMode())
+ return new VisorDefragmentationTaskResult(false, "Node is not in maintenance node.");
+
+ IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database();
+
+ assert dbMgr instanceof GridCacheDatabaseSharedManager;
+
+ CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr)
+ .defragmentationManager();
+
+ if (defrgMgr == null)
+ return new VisorDefragmentationTaskResult(true, "There's no active defragmentation process on the node.");
+
+ return new VisorDefragmentationTaskResult(true, defrgMgr.status());
+ }
+
+ /** */
+ private VisorDefragmentationTaskResult runCancel(VisorDefragmentationTaskArg arg) {
+ assert arg.cacheNames() == null : "Cancelling specific caches is not yet implemented";
+
+ MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
+
+ if (!mntcReg.isMaintenanceMode()) {
+ boolean deleted = mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+ String msg = deleted
+ ? "Scheduled defragmentation task cancelled successfully."
+ : "Scheduled defragmentation task is not found.";
+
+ return new VisorDefragmentationTaskResult(true, msg);
+ }
+ else {
+ List<MaintenanceAction<?>> actions;
+
+ try {
+ actions = mntcReg.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+ }
+ catch (IgniteException e) {
+ return new VisorDefragmentationTaskResult(true, "Defragmentation is already completed or has been cancelled previously.");
+ }
+
+ Optional<MaintenanceAction<?>> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny();
+
+ assert stopAct.isPresent();
+
+ try {
+ Object res = stopAct.get().execute();
+
+ assert res instanceof Boolean;
+
+ boolean cancelled = (Boolean)res;
+
+ String msg = cancelled
+ ? "Defragmentation cancelled successfully."
+ : "Defragmentation is already completed or has been cancelled previously.";
+
+ return new VisorDefragmentationTaskResult(true, msg);
+ }
+ catch (Exception e) {
+ return new VisorDefragmentationTaskResult(false, "Exception occurred: " + e.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java
new file mode 100644
index 0000000..1b1c8b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class VisorDefragmentationTaskArg extends IgniteDataTransferObject {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private VisorDefragmentationOperation operation;
+
+ /** */
+ private List<String> nodeIds;
+
+ /** */
+ private List<String> cacheNames;
+
+ /** Empty constructor for serialization. */
+ public VisorDefragmentationTaskArg() {
+ // No-op.
+ }
+
+ /** */
+ public VisorDefragmentationTaskArg(
+ VisorDefragmentationOperation operation,
+ List<String> nodeIds,
+ List<String> cacheNames
+ ) {
+
+ this.operation = operation;
+ this.nodeIds = nodeIds;
+ this.cacheNames = cacheNames;
+ }
+
+ /** */
+ public VisorDefragmentationOperation operation() {
+ return operation;
+ }
+
+ /** */
+ public List<String> nodeIds() {
+ return nodeIds;
+ }
+
+ /** */
+ public List<String> cacheNames() {
+ return cacheNames;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeEnum(out, operation);
+
+ U.writeCollection(out, nodeIds);
+
+ U.writeCollection(out, cacheNames);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+ operation = U.readEnum(in, VisorDefragmentationOperation.class);
+
+ nodeIds = U.readList(in);
+
+ cacheNames = U.readList(in);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java
new file mode 100644
index 0000000..4575ac2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.defragmentation;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+public class VisorDefragmentationTaskResult extends IgniteDataTransferObject {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private boolean success;
+
+ /** */
+ private String msg;
+
+ /** Empty constructor for serialization. */
+ public VisorDefragmentationTaskResult() {
+ // No-op.
+ }
+
+ /** */
+ public VisorDefragmentationTaskResult(boolean success, String msg) {
+ this.success = success;
+
+ this.msg = msg;
+ }
+
+ /** */
+ public boolean isSuccess() {
+ return success;
+ }
+
+ /** */
+ public String getMessage() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ out.writeBoolean(success);
+
+ U.writeString(out, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException {
+ success = in.readBoolean();
+
+ msg = U.readString(in);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
index 9cebef0..a2cabde 100644
--- a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
@@ -104,8 +104,9 @@ public interface MaintenanceRegistry {
* Deletes {@link MaintenanceTask} of given ID from maintenance registry.
*
* @param maintenanceTaskName name of {@link MaintenanceTask} to be deleted.
+ * @return {@code true} if existing task has been deleted.
*/
- public void unregisterMaintenanceTask(String maintenanceTaskName);
+ public boolean unregisterMaintenanceTask(String maintenanceTaskName);
/**
* Returns active {@link MaintenanceTask} by its name.
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 92d01dd..f480d0f 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2124,6 +2124,11 @@ org.apache.ignite.internal.visor.debug.VisorThreadDumpTaskResult
org.apache.ignite.internal.visor.debug.VisorThreadInfo
org.apache.ignite.internal.visor.debug.VisorThreadLockInfo
org.apache.ignite.internal.visor.debug.VisorThreadMonitorInfo
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask$VisorDefragmentationJob
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskArg
+org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTaskResult
org.apache.ignite.internal.visor.event.VisorGridDeploymentEvent
org.apache.ignite.internal.visor.event.VisorGridDiscoveryEvent
org.apache.ignite.internal.visor.event.VisorGridEvent
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
index 10e1db1..9b1067a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
@@ -43,6 +43,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
/**
* Concurrent and advanced tests for WAL state change.
@@ -190,10 +191,18 @@ public class WalModeChangeAdvancedSelfTest extends WalModeChangeCommonAbstractSe
cleanCacheDir(cacheToClean);
- // Node should start successfully and not enter maintenance mode as MaintenanceRecord will be cleaned
+ // Node should start successfully and enter maintenance mode. MaintenanceRecord will be cleaned
// automatically because corrupted PDS was deleted during downtime
srv = startGrid(config(SRV_1, false, false));
- assertFalse(srv.context().maintenanceRegistry().isMaintenanceMode());
+ assertTrue(srv.context().maintenanceRegistry().isMaintenanceMode());
+
+ try {
+ srv.context().maintenanceRegistry().actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
+
+ fail("Maintenance task is not completed yet for some reason.");
+ }
+ catch (Exception ignore) {
+ }
stopAllGrids(false);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
index 8f06a48..4ce9bb4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
@@ -39,6 +39,9 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteState;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.IgnitionListener;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -48,12 +51,12 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.maintenance.MaintenanceFileStore;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
-import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.testframework.GridTestUtils;
@@ -200,10 +203,25 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
startGrid(0);
+ waitForDefragmentation(0);
+
+ assertEquals(ClusterState.INACTIVE, grid(0).context().state().clusterState().state());
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> {
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ return null;
+ },
+ IgniteCheckedException.class,
+ "Failed to activate cluster (node is in maintenance mode)"
+ );
+
long[] newPartLen = partitionSizes(workDir);
for (int p = 0; p < PARTS; p++)
- assertTrue(newPartLen[p] < oldPartLen[p]); //TODO Fails.
+ assertTrue(newPartLen[p] < oldPartLen[p]);
long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
@@ -252,11 +270,22 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
}
/** */
+ protected void waitForDefragmentation(int idx) throws IgniteCheckedException {
+ IgniteEx ig = grid(idx);
+
+ ((GridCacheDatabaseSharedManager)ig.context().cache().context().database())
+ .defragmentationManager()
+ .completionFuture()
+ .get();
+ }
+
+ /** */
protected void createMaintenanceRecord() throws IgniteCheckedException {
IgniteEx grid = grid(0);
+
MaintenanceRegistry mntcReg = grid.context().maintenanceRegistry();
- mntcReg.registerMaintenanceTask(toStore(Collections.singletonList(groupIdForCache(grid, DEFAULT_CACHE_NAME))));
+ mntcReg.registerMaintenanceTask(toStore(Collections.singletonList(DEFAULT_CACHE_NAME)));
}
/**
@@ -383,6 +412,29 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
File workDir = resolveCacheWorkDir(ig);
+ //Defragmentation should fail when node starts.
+ startAndAwaitNodeFail(workDir);
+
+ c.accept(workDir);
+
+ startGrid(0); // Fails here VERY rarely. WTF?
+
+ waitForDefragmentation(0);
+
+ stopGrid(0);
+
+ // Everything must be completed.
+ startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+ validateCache(grid(0).cache(DEFAULT_CACHE_NAME));
+
+ validateLeftovers(workDir);
+ }
+
+ /**
+ * @throws IgniteInterruptedCheckedException If fail.
+ */
+ private void startAndAwaitNodeFail(File workDir) throws IgniteInterruptedCheckedException {
String errMsg = "Failed to create defragmentation completion marker.";
AtomicBoolean errOccurred = new AtomicBoolean();
@@ -405,34 +457,32 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
return cfg;
};
- try {
- startGrid(0, cfgOp);
- }
- catch (Exception ignore) {
- // No-op.
- }
-
- // Failed node can leave interrupted status of the thread that needs to be cleared,
- // otherwise following "wait" wouldn't work.
- // This call can't be moved inside of "catch" block because interruption can actually be silent.
- Thread.interrupted();
-
- assertTrue(GridTestUtils.waitForCondition(errOccurred::get, 10_000L));
-
- assertTrue(GridTestUtils.waitForCondition(() -> G.allGrids().isEmpty(), 10_000L));
-
- c.accept(workDir);
-
- startGrid(0);
-
- stopGrid(0);
+ AtomicBoolean nodeStopped = new AtomicBoolean();
+ IgnitionListener nodeStopListener = (name, state) -> {
+ if (name.equals(getTestIgniteInstanceName(0)) && state == IgniteState.STOPPED_ON_FAILURE)
+ nodeStopped.set(true);
+ };
- // Everything must be completed.
- startGrid(0).cluster().state(ClusterState.ACTIVE);
+ Ignition.addListener(nodeStopListener);
+ try {
+ try {
+ startGrid(0, cfgOp);
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
- validateCache(grid(0).cache(DEFAULT_CACHE_NAME));
+ // Failed node can leave interrupted status of the thread that needs to be cleared,
+ // otherwise following "wait" wouldn't work.
+ // This call can't be moved inside of "catch" block because interruption can actually be silent.
+ Thread.interrupted();
- validateLeftovers(workDir);
+ assertTrue(GridTestUtils.waitForCondition(errOccurred::get, 3_000L));
+ assertTrue(GridTestUtils.waitForCondition(nodeStopped::get, 3_000L));
+ }
+ finally {
+ Ignition.removeListener(nodeStopListener);
+ }
}
/** */
@@ -463,6 +513,8 @@ public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
startGrid(0);
+ waitForDefragmentation(0);
+
File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
AtomicReference<File> cachePartFile = new AtomicReference<>();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index 5f0b04f..732a48a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -345,7 +345,8 @@ public class DummyQueryIndexing implements GridQueryIndexing {
CacheGroupContext newCtx,
PageMemoryEx partPageMem,
IntMap<LinkMap> mappingByPart,
- CheckpointTimeoutLock cpLock
+ CheckpointTimeoutLock cpLock,
+ Runnable cancellationChecker
) throws IgniteCheckedException {
// No-op.
}
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
index 5950ed1..a4c2ec4 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
@@ -221,6 +221,15 @@ If the file name isn't specified the output file name is: '<typeId>.bin'
Backup data files of only given caches:
control.(sh|bat) --persistence backup caches cache1,cache2,cache3
+ Schedule PDS defragmentation on given nodes for all caches:
+ control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1
+
+ Schedule PDS defragmentation on given nodes but only for given caches:
+ control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 --caches cache1,cache2,cache3
+
+ Cancel scheduled or active PDS defragmentation on underlying node:
+ control.(sh|bat) --defragmentation cancel
+
By default commands affecting the cluster require interactive confirmation.
Use --yes option to disable it.
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
index 5950ed1..a4c2ec4 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
@@ -221,6 +221,15 @@ If the file name isn't specified the output file name is: '<typeId>.bin'
Backup data files of only given caches:
control.(sh|bat) --persistence backup caches cache1,cache2,cache3
+ Schedule PDS defragmentation on given nodes for all caches:
+ control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1
+
+ Schedule PDS defragmentation on given nodes but only for given caches:
+ control.(sh|bat) --defragmentation schedule --nodes consistentId0,consistentId1 --caches cache1,cache2,cache3
+
+ Cancel scheduled or active PDS defragmentation on underlying node:
+ control.(sh|bat) --defragmentation cancel
+
By default commands affecting the cluster require interactive confirmation.
Use --yes option to disable it.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 517bc69..6d4b4e3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -3201,8 +3201,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
CacheGroupContext newCtx,
PageMemoryEx partPageMem,
IntMap<LinkMap> mappingByPart,
- CheckpointTimeoutLock cpLock
+ CheckpointTimeoutLock cpLock,
+ Runnable cancellationChecker
) throws IgniteCheckedException {
- defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, log);
+ defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, cancellationChecker, log);
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
index c41f587..19d15c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
@@ -80,6 +80,7 @@ public class IndexingDefragmentation {
* @param partPageMem Partition page memory.
* @param mappingByPartition Mapping page memory.
* @param cpLock Defragmentation checkpoint read lock.
+ * @param cancellationChecker Cancellation checker.
* @param log Log.
*
* @throws IgniteCheckedException If failed.
@@ -90,6 +91,7 @@ public class IndexingDefragmentation {
PageMemoryEx partPageMem,
IntMap<LinkMap> mappingByPartition,
CheckpointTimeoutLock cpLock,
+ Runnable cancellationChecker,
IgniteLogger log
) throws IgniteCheckedException {
int pageSize = grpCtx.cacheObjectContext().kernalContext().grid().configuration().getDataStorageConfiguration().getPageSize();
@@ -115,6 +117,8 @@ public class IndexingDefragmentation {
if (cctx.groupId() != grpCtx.groupId())
continue; // Not our index.
+ cancellationChecker.run();
+
GridH2RowDescriptor rowDesc = table.rowDescriptor();
List<Index> indexes = table.getIndexes();
@@ -155,6 +159,8 @@ public class IndexingDefragmentation {
H2Tree tree = oldH2Idx.treeForRead(i);
treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> {
+ cancellationChecker.run();
+
if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
cpLock.checkpointReadUnlock();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index bbb69ae..cf6b422 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -136,6 +136,8 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
startGrid(0);
+ waitForDefragmentation(0);
+
long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
assertTrue(newIdxFileLen <= oldIdxFileLen);