You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/16 10:26:13 UTC
[ignite] branch master updated: IGNITE-13743 JMX API for
Defragmentation monitoring and management - Fixes #8496.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 702dd49 IGNITE-13743 JMX API for Defragmentation monitoring and management - Fixes #8496.
702dd49 is described below
commit 702dd49fc54373042f929aa94d6426c247d7a76d
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Dec 16 13:21:53 2020 +0300
IGNITE-13743 JMX API for Defragmentation monitoring and management - Fixes #8496.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../commandline/DefragmentationCommand.java | 1 -
.../apache/ignite/internal/GridKernalContext.java | 8 +
.../ignite/internal/GridKernalContextImpl.java | 13 +
.../internal/managers/IgniteMBeansManager.java | 6 +
.../CachePartitionDefragmentationManager.java | 308 ++++++++++++-------
.../defragmentation/DefragmentationMXBeanImpl.java | 85 +++++
.../defragmentation/IgniteDefragmentation.java | 341 +++++++++++++++++++++
.../defragmentation/IgniteDefragmentationImpl.java | 223 ++++++++++++++
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../defragmentation/VisorDefragmentationTask.java | 124 +++-----
.../VisorDefragmentationTaskArg.java | 14 -
.../ignite/mxbean/DefragmentationMXBean.java | 73 +++++
.../defragmentation/DefragmentationMXBeanTest.java | 322 +++++++++++++++++++
13 files changed, 1317 insertions(+), 207 deletions(-)
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
index ec5c1f0..e421863 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
@@ -226,7 +226,6 @@ public class DefragmentationCommand implements Command<DefragmentationArguments>
private VisorDefragmentationTaskArg convertArguments() {
return new VisorDefragmentationTaskArg(
convertSubcommand(args.subcommand()),
- args.nodeIds(),
args.cacheNames()
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index a799f60..2e2fae6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
@@ -453,6 +454,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public GridEncryptionManager encryption();
/**
+ * Gets defragmentation manager.
+ *
+ * @return Defragmentation manager.
+ */
+ public IgniteDefragmentation defragmentation();
+
+ /**
* Gets workers registry.
*
* @return Workers registry.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 092cf1a..a9f80be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -57,6 +57,8 @@ import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManage
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentationImpl;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
@@ -176,6 +178,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ private IgniteDefragmentation defragMgr;
+
+ /** */
+ @GridToStringExclude
private GridTracingManager tracingMgr;
/*
@@ -561,6 +567,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
marshCtx = new MarshallerContextImpl(plugins, clsFilter);
+ defragMgr = new IgniteDefragmentationImpl(this);
+
try {
spring = SPRING.create(false);
}
@@ -913,6 +921,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public IgniteDefragmentation defragmentation() {
+ return defragMgr;
+ }
+
+ /** {@inheritDoc} */
@Override public WorkersRegistry workersRegistry() {
return workersRegistry;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
index 5c502e9..a35bbd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.TransactionMetricsMxBeanImpl;
import org.apache.ignite.internal.TransactionsMXBeanImpl;
import org.apache.ignite.internal.managers.encryption.EncryptionMXBeanImpl;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMXBeanImpl;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationMXBeanImpl;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMXBeanImpl;
import org.apache.ignite.internal.processors.cache.warmup.WarmUpMXBeanImpl;
import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustMXBeanImpl;
@@ -52,6 +53,7 @@ import org.apache.ignite.mxbean.BaselineAutoAdjustMXBean;
import org.apache.ignite.mxbean.ClusterMetricsMXBean;
import org.apache.ignite.mxbean.ComputeMXBean;
import org.apache.ignite.mxbean.DataStorageMXBean;
+import org.apache.ignite.mxbean.DefragmentationMXBean;
import org.apache.ignite.mxbean.EncryptionMXBean;
import org.apache.ignite.mxbean.FailureHandlingMxBean;
import org.apache.ignite.mxbean.IgniteMXBean;
@@ -187,6 +189,10 @@ public class IgniteMBeansManager {
SnapshotMXBean snpMXBean = new SnapshotMXBeanImpl(ctx);
registerMBean("Snapshot", snpMXBean.getClass().getSimpleName(), snpMXBean, SnapshotMXBean.class);
+ // Defragmentation.
+ DefragmentationMXBean defragMXBean = new DefragmentationMXBeanImpl(ctx);
+ registerMBean("Defragmentation", defragMXBean.getClass().getSimpleName(), defragMXBean, DefragmentationMXBean.class);
+
// Metrics configuration
MetricsMxBean metricsMxBean = new MetricsMxBeanImpl(ctx.metric(), log);
registerMBean("Metrics", metricsMxBean.getClass().getSimpleName(), metricsMxBean, MetricsMxBean.class);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index aa82a23..48616b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -19,12 +19,10 @@ package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
import java.io.File;
import java.nio.file.Path;
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -46,6 +44,7 @@ import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
@@ -154,7 +153,7 @@ public class CachePartitionDefragmentationManager {
private final AtomicBoolean cancel = new AtomicBoolean();
/** */
- private final DefragmentationStatus status = new DefragmentationStatus();
+ private final Status status = new Status();
/** */
private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();
@@ -221,7 +220,30 @@ public class CachePartitionDefragmentationManager {
/** */
public void executeDefragmentation() throws IgniteCheckedException {
- status.onStart(cacheGrpCtxsForDefragmentation);
+ Map<Integer, List<CacheDataStore>> oldStores = new HashMap<>();
+
+ for (CacheGroupContext oldGrpCtx : cacheGrpCtxsForDefragmentation) {
+ int grpId = oldGrpCtx.groupId();
+
+ final IgniteCacheOffheapManager offheap = oldGrpCtx.offheap();
+
+ List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
+ .filter(store -> {
+ try {
+ return filePageStoreMgr.exists(grpId, store.partId());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ oldStores.put(grpId, oldCacheDataStores);
+ }
+
+ int partitionCount = oldStores.values().stream().mapToInt(List::size).sum();
+
+ status.onStart(cacheGrpCtxsForDefragmentation, partitionCount);
try {
// Now the actual process starts.
@@ -235,8 +257,10 @@ public class CachePartitionDefragmentationManager {
File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName());
+ List<CacheDataStore> oldCacheDataStores = oldStores.get(grpId);
+
if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) {
- status.onCacheGroupSkipped(oldGrpCtx);
+ status.onCacheGroupSkipped(oldGrpCtx, oldCacheDataStores.size());
continue;
}
@@ -244,17 +268,6 @@ public class CachePartitionDefragmentationManager {
try {
GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
- List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
- .filter(store -> {
- try {
- return filePageStoreMgr.exists(grpId, store.partId());
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- })
- .collect(Collectors.toList());
-
status.onCacheGroupStart(oldGrpCtx, oldCacheDataStores.size());
if (workDir == null || oldCacheDataStores.isEmpty()) {
@@ -609,8 +622,8 @@ public class CachePartitionDefragmentationManager {
}
/** */
- public String status() {
- return status.toString();
+ public Status status() {
+ return status;
}
/**
@@ -979,58 +992,110 @@ public class CachePartitionDefragmentationManager {
private static final long serialVersionUID = 0L;
}
- /** */
- private class DefragmentationStatus {
- /** */
+ /** Defragmentation status. */
+ class Status {
+ /** Defragmentation start timestamp. */
private long startTs;
- /** */
+ /** Defragmentation finish timestamp. */
private long finishTs;
- /** */
- private final Set<String> scheduledGroups = new TreeSet<>();
+ /** Total count of partitions. */
+ private int totalPartitionCount;
- /** */
- private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> progressGroups
- = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+ /** Partitions, that are already defragmented. */
+ private int defragmentedPartitionCount;
- /** */
- private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> finishedGroups
- = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+ /** Cache groups scheduled for defragmentation. */
+ private final Set<String> scheduledGroups;
- /** */
- private final Set<String> skippedGroups = new TreeSet<>();
+ /** Progress for cache group. */
+ private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> progressGroups;
- /** */
- public synchronized void onStart(Set<CacheGroupContext> scheduledGroups) {
+ /** Finished cache groups. */
+ private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> finishedGroups;
+
+ /** Skipped cache groups. */
+ private final Set<String> skippedGroups;
+
+ /** Constructor. */
+ public Status() {
+ scheduledGroups = new TreeSet<>();
+ progressGroups = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+ finishedGroups = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+ skippedGroups = new TreeSet<>();
+ }
+
+ /** Copy constructor. */
+ public Status(
+ long startTs,
+ long finishTs,
+ Set<String> scheduledGroups,
+ Map<CacheGroupContext, DefragmentationCacheGroupProgress> progressGroups,
+ Map<CacheGroupContext, DefragmentationCacheGroupProgress> finishedGroups,
+ Set<String> skippedGroups
+ ) {
+ this.startTs = startTs;
+ this.finishTs = finishTs;
+ this.scheduledGroups = scheduledGroups;
+ this.progressGroups = progressGroups;
+ this.finishedGroups = finishedGroups;
+ this.skippedGroups = skippedGroups;
+ }
+
+ /**
+ * Mark the start of the defragmentation.
+ * @param scheduledGroups Groups scheduled for defragmentation.
+ * @param partitions Total partition count.
+ */
+ public synchronized void onStart(Set<CacheGroupContext> scheduledGroups, int partitions) {
startTs = System.currentTimeMillis();
+ totalPartitionCount = partitions;
- for (CacheGroupContext grp : scheduledGroups) {
+ for (CacheGroupContext grp : scheduledGroups)
this.scheduledGroups.add(grp.cacheOrGroupName());
- }
log.info("Defragmentation started.");
}
- /** */
- public synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) {
+ /**
+ * Mark the start of the cache group defragmentation.
+ * @param grpCtx Cache group context.
+ * @param parts Partition count.
+ */
+ private synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) {
scheduledGroups.remove(grpCtx.cacheOrGroupName());
progressGroups.put(grpCtx, new DefragmentationCacheGroupProgress(parts));
}
- /** */
- public synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
+ /**
+ * Mark the end of the partition defragmentation.
+ * @param grpCtx Cache group context.
+ * @param oldSize Old size.
+ * @param newSize New size;
+ */
+ private synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
progressGroups.get(grpCtx).onPartitionDefragmented(oldSize, newSize);
+
+ defragmentedPartitionCount++;
}
- /** */
- public synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
+ /**
+ * Mark the end of the index partition defragmentation.
+ * @param grpCtx Cache group context.
+ * @param oldSize Old size.
+ * @param newSize New size;
+ */
+ private synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
progressGroups.get(grpCtx).onIndexDefragmented(oldSize, newSize);
}
- /** */
- public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) {
+ /**
+ * Mark the end of the cache group defragmentation.
+ * @param grpCtx Cache group context.
+ */
+ private synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) {
DefragmentationCacheGroupProgress progress = progressGroups.remove(grpCtx);
progress.onFinish();
@@ -1038,15 +1103,20 @@ public class CachePartitionDefragmentationManager {
finishedGroups.put(grpCtx, progress);
}
- /** */
- public synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx) {
+ /**
+ * Mark that cache group defragmentation was skipped.
+ * @param grpCtx Cache group context.
+ */
+ private synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx, int partitions) {
scheduledGroups.remove(grpCtx.cacheOrGroupName());
skippedGroups.add(grpCtx.cacheOrGroupName());
+
+ defragmentedPartitionCount += partitions;
}
- /** */
- public synchronized void onFinish() {
+ /** Mark the end of the defragmentation. */
+ private synchronized void onFinish() {
finishTs = System.currentTimeMillis();
progressGroups.clear();
@@ -1056,67 +1126,80 @@ public class CachePartitionDefragmentationManager {
log.info("Defragmentation process completed. Time: " + (finishTs - startTs) * 1e-3 + "s.");
}
- /** {@inheritDoc} */
- @Override public synchronized String toString() {
- StringBuilder sb = new StringBuilder();
-
- if (!finishedGroups.isEmpty()) {
- sb.append("Defragmentation is completed for cache groups:\n");
-
- for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : finishedGroups.entrySet()) {
- sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - ");
-
- sb.append(entry.getValue().toString()).append('\n');
- }
- }
+ /** Copy object. */
+ private synchronized Status copy() {
+ return new Status(
+ startTs,
+ finishTs,
+ new HashSet<>(scheduledGroups),
+ new HashMap<>(progressGroups),
+ new HashMap<>(finishedGroups),
+ new HashSet<>(skippedGroups)
+ );
+ }
- if (!progressGroups.isEmpty()) {
- sb.append("Defragmentation is in progress for cache groups:\n");
+ /** */
+ public long getStartTs() {
+ return startTs;
+ }
- for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : progressGroups.entrySet()) {
- sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - ");
+ /** */
+ public long getFinishTs() {
+ return finishTs;
+ }
- sb.append(entry.getValue().toString()).append('\n');
- }
- }
+ /** */
+ public Set<String> getScheduledGroups() {
+ return scheduledGroups;
+ }
- if (!skippedGroups.isEmpty())
- sb.append("Skipped cache groups: ").append(String.join(", ", skippedGroups)).append('\n');
+ /** */
+ public Map<CacheGroupContext, DefragmentationCacheGroupProgress> getProgressGroups() {
+ return progressGroups;
+ }
- if (!scheduledGroups.isEmpty())
- sb.append("Awaiting defragmentation: ").append(String.join(", ", scheduledGroups)).append('\n');
+ /** */
+ public Map<CacheGroupContext, DefragmentationCacheGroupProgress> getFinishedGroups() {
+ return finishedGroups;
+ }
- return sb.toString();
+ /** */
+ public Set<String> getSkippedGroups() {
+ return skippedGroups;
}
- }
- /** */
- private static class DefragmentationCacheGroupProgress {
/** */
- private static final DecimalFormat MB_FORMAT = new DecimalFormat(
- "#.##",
- DecimalFormatSymbols.getInstance(Locale.US)
- );
+ public int getTotalPartitionCount() {
+ return totalPartitionCount;
+ }
/** */
+ public int getDefragmentedPartitionCount() {
+ return defragmentedPartitionCount;
+ }
+ }
+
+ /** Cache group defragmentation progress. */
+ static class DefragmentationCacheGroupProgress {
+ /** Partition count. */
private final int partsTotal;
- /** */
+ /** Defragmented partitions. */
private int partsCompleted;
- /** */
+ /** Old cache group size. */
private long oldSize;
- /** */
+ /** New cache group size. */
private long newSize;
- /** */
+ /** Start timestamp. */
private final long startTs;
- /** */
+ /** Finish timestamp. */
private long finishTs;
- /** */
+ /** Constructor. */
public DefragmentationCacheGroupProgress(int parts) {
partsTotal = parts;
@@ -1144,43 +1227,38 @@ public class CachePartitionDefragmentationManager {
}
/** */
- public void onFinish() {
- finishTs = System.currentTimeMillis();
+ public long getOldSize() {
+ return oldSize;
}
- /** {@inheritDoc} */
- @Override public String toString() {
- StringBuilder sb = new StringBuilder();
-
- if (finishTs == 0) {
- sb.append("partitions processed/all: ").append(partsCompleted).append("/").append(partsTotal);
-
- sb.append(", time elapsed: ");
-
- appendDuration(sb, System.currentTimeMillis());
- }
- else {
- double mb = 1024 * 1024;
-
- sb.append("size before/after: ").append(MB_FORMAT.format(oldSize / mb)).append("MB/");
- sb.append(MB_FORMAT.format(newSize / mb)).append("MB");
-
- sb.append(", time took: ");
+ /** */
+ public long getNewSize() {
+ return newSize;
+ }
- appendDuration(sb, finishTs);
- }
+ /** */
+ public long getStartTs() {
+ return startTs;
+ }
- return sb.toString();
+ /** */
+ public long getFinishTs() {
+ return finishTs;
}
/** */
- private void appendDuration(StringBuilder sb, long end) {
- long duration = Math.round((end - startTs) * 1e-3);
+ public int getPartsTotal() {
+ return partsTotal;
+ }
- long mins = duration / 60;
- long secs = duration % 60;
+ /** */
+ public int getPartsCompleted() {
+ return partsCompleted;
+ }
- sb.append(mins).append(" mins ").append(secs).append(" secs");
+ /** */
+ public void onFinish() {
+ finishTs = System.currentTimeMillis();
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java
new file mode 100644
index 0000000..1e3becb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.mxbean.DefragmentationMXBean;
+
+/**
+ * Defragmentation MX bean implementation.
+ */
+public class DefragmentationMXBeanImpl implements DefragmentationMXBean {
+ /** Defragmentation manager. */
+ private final IgniteDefragmentation defragmentation;
+
+ public DefragmentationMXBeanImpl(GridKernalContext ctx) {
+ this.defragmentation = ctx.defragmentation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean schedule(String cacheNames) {
+ final List<String> caches = Arrays.stream(cacheNames.split(","))
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+
+ try {
+ defragmentation.schedule(caches);
+
+ return true;
+ }
+ catch (IgniteCheckedException e) {
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() {
+ try {
+ defragmentation.cancel();
+
+ return true;
+ }
+ catch (IgniteCheckedException e) {
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean inProgress() {
+ return defragmentation.inProgress();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int processedPartitions() {
+ return defragmentation.processedPartitions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int totalPartitions() {
+ return defragmentation.totalPartitions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return defragmentation.startTime();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java
new file mode 100644
index 0000000..a5dc811
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Defragmentation operation service.
+ */
+public interface IgniteDefragmentation {
+ /**
+ * Schedule defragmentaton on next start of the node.
+ *
+ * @param cacheNames Names of caches to run defragmentation on.
+ * @return Result of the scheduling.
+ * @throws IgniteCheckedException If failed.
+ */
+ ScheduleResult schedule(List<String> cacheNames) throws IgniteCheckedException;
+
+ /**
+ * Cancel scheduled or ongoing defragmentation.
+ * @return Result of the cancellation.
+ * @throws IgniteCheckedException If failed.
+ */
+ CancelResult cancel() throws IgniteCheckedException;
+
+ /**
+ * Get the status of the ongoing defragmentation.
+ * @return Defragmentation status.
+ * @throws IgniteCheckedException If failed.
+ */
+ DefragmentationStatus status() throws IgniteCheckedException;
+
+ /**
+ * @return {@code true} if there is an ongoing defragmentation.
+ */
+ boolean inProgress();
+
+ /**
+ * @return Number of processed partitions, or 0 if there is no ongoing defragmentation.
+ */
+ int processedPartitions();
+
+ /**
+ * @return Number of total partitions, or 0 if there is no ongoing defragmentation.
+ */
+ int totalPartitions();
+
+ /**
+ * @return Timestamp of the beginning of the ongoing defragmentation or 0 if there is none.
+ */
+ long startTime();
+
+ /** Result of the scheduling. */
+ public enum ScheduleResult {
+ /**
+ * Successfully scheduled.
+ */
+ SUCCESS,
+
+ /**
+ * Successfuly scheduled, superseding previously scheduled defragmentation.
+ */
+ SUCCESS_SUPERSEDED_PREVIOUS
+ }
+
+ /** Result of the cancellation. */
+ public enum CancelResult {
+ /**
+ * Cancelled scheduled defragmentation.
+ */
+ CANCELLED_SCHEDULED,
+
+ /**
+ * Nothing to cancel, no ongoing defragmentation.
+ */
+ SCHEDULED_NOT_FOUND,
+
+ /**
+ * Cancelled ongoing defragmentation.
+ */
+ CANCELLED,
+
+ /**
+ * Defragmentation is already completed or cancelled.
+ */
+ COMPLETED_OR_CANCELLED
+ }
+
+ /** */
+ public static class DefragmentationStatus {
+ /** */
+ private final Map<String, CompletedDefragmentationInfo> completedCaches;
+
+ /** */
+ private final Map<String, InProgressDefragmentationInfo> inProgressCaches;
+
+ /** */
+ private final Set<String> awaitingCaches;
+
+ /** */
+ private final Set<String> skippedCaches;
+
+ /** */
+ private final int totalPartitions;
+
+ /** */
+ private final int processedPartitions;
+
+ /** */
+ private final long startTs;
+
+ /** */
+ private final long totalElapsedTime;
+
+ public DefragmentationStatus(
+ Map<String, CompletedDefragmentationInfo> completedCaches,
+ Map<String, InProgressDefragmentationInfo> inProgressCaches,
+ Set<String> awaitingCaches,
+ Set<String> skippedCaches,
+ int totalPartitions,
+ int processedPartitions,
+ long startTs,
+ long totalElapsedTime
+ ) {
+ this.completedCaches = completedCaches;
+ this.inProgressCaches = inProgressCaches;
+ this.awaitingCaches = awaitingCaches;
+ this.skippedCaches = skippedCaches;
+ this.totalPartitions = totalPartitions;
+ this.processedPartitions = processedPartitions;
+ this.startTs = startTs;
+ this.totalElapsedTime = totalElapsedTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ if (!completedCaches.isEmpty()) {
+ sb.append("Defragmentation is completed for cache groups:\n");
+
+ for (Map.Entry<String, CompletedDefragmentationInfo> entry : completedCaches.entrySet()) {
+ sb.append(" ").append(entry.getKey()).append(" - ");
+
+ sb.append(entry.getValue().toString()).append('\n');
+ }
+ }
+
+ if (!inProgressCaches.isEmpty()) {
+ sb.append("Defragmentation is in progress for cache groups:\n");
+
+ for (Map.Entry<String, InProgressDefragmentationInfo> entry : inProgressCaches.entrySet()) {
+ sb.append(" ").append(entry.getKey()).append(" - ");
+
+ sb.append(entry.getValue().toString()).append('\n');
+ }
+ }
+
+ if (!skippedCaches.isEmpty())
+ sb.append("Skipped cache groups: ").append(String.join(", ", skippedCaches)).append('\n');
+
+ if (!awaitingCaches.isEmpty())
+ sb.append("Awaiting defragmentation: ").append(String.join(", ", awaitingCaches)).append('\n');
+
+ return sb.toString();
+ }
+
+ /** */
+ public Map<String, CompletedDefragmentationInfo> getCompletedCaches() {
+ return completedCaches;
+ }
+
+ /** */
+ public Map<String, InProgressDefragmentationInfo> getInProgressCaches() {
+ return inProgressCaches;
+ }
+
+ /** */
+ public Set<String> getAwaitingCaches() {
+ return awaitingCaches;
+ }
+
+ /** */
+ public Set<String> getSkippedCaches() {
+ return skippedCaches;
+ }
+
+ /** */
+ public long getTotalElapsedTime() {
+ return totalElapsedTime;
+ }
+
+ /** */
+ public int getTotalPartitions() {
+ return totalPartitions;
+ }
+
+ /** */
+ public int getProcessedPartitions() {
+ return processedPartitions;
+ }
+
+ /** */
+ public long getStartTs() {
+ return startTs;
+ }
+ }
+
+ /** */
+ abstract class DefragmentationInfo {
+ /** */
+ long elapsedTime;
+
+ public DefragmentationInfo(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+
+ /** */
+ void appendDuration(StringBuilder sb, long elapsedTime) {
+ long duration = Math.round(elapsedTime * 1e-3);
+
+ long mins = duration / 60;
+ long secs = duration % 60;
+
+ sb.append(mins).append(" mins ").append(secs).append(" secs");
+ }
+
+ /** */
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+ }
+
+ /** */
+ public static class CompletedDefragmentationInfo extends DefragmentationInfo {
+ /** */
+ private static final DecimalFormat MB_FORMAT = new DecimalFormat(
+ "#.##",
+ DecimalFormatSymbols.getInstance(Locale.US)
+ );
+
+ /** */
+ long sizeBefore;
+
+ /** */
+ long sizeAfter;
+
+ public CompletedDefragmentationInfo(long elapsedTime, long sizeBefore, long sizeAfter) {
+ super(elapsedTime);
+ this.sizeBefore = sizeBefore;
+ this.sizeAfter = sizeAfter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ double mb = 1024 * 1024;
+
+ sb.append("size before/after: ").append(MB_FORMAT.format(sizeBefore / mb)).append("MB/");
+ sb.append(MB_FORMAT.format(sizeAfter / mb)).append("MB");
+
+ sb.append(", time took: ");
+
+ appendDuration(sb, elapsedTime);
+
+ return sb.toString();
+ }
+
+ /** */
+ public long getSizeBefore() {
+ return sizeBefore;
+ }
+
+ /** */
+ public long getSizeAfter() {
+ return sizeAfter;
+ }
+ }
+
+ /** */
+ public static class InProgressDefragmentationInfo extends DefragmentationInfo {
+ /** */
+ int processedPartitions;
+
+ /** */
+ int totalPartitions;
+
+ public InProgressDefragmentationInfo(long elapsedTime, int processedPartitions, int totalPartitions) {
+ super(elapsedTime);
+ this.processedPartitions = processedPartitions;
+ this.totalPartitions = totalPartitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("partitions processed/all: ").append(processedPartitions).append("/").append(totalPartitions);
+
+ sb.append(", time elapsed: ");
+
+ appendDuration(sb, elapsedTime);
+
+ return sb.toString();
+ }
+
+ /** */
+ public int getProcessedPartitions() {
+ return processedPartitions;
+ }
+
+ /** */
+ public int getTotalPartitions() {
+ return totalPartitions;
+ }
+ }
+
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java
new file mode 100644
index 0000000..5c443ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.Status;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.maintenance.MaintenanceTask;
+
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore;
+
+/**
+ * Defragmentation operation service implementation.
+ */
+public class IgniteDefragmentationImpl implements IgniteDefragmentation {
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ public IgniteDefragmentationImpl(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ScheduleResult schedule(List<String> cacheNames) throws IgniteCheckedException {
+ final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry();
+
+ MaintenanceTask oldTask;
+
+ try {
+ oldTask = maintenanceRegistry.registerMaintenanceTask(toStore(cacheNames != null ? cacheNames : Collections.emptyList()));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Scheduling failed: " + e.getMessage());
+ }
+
+ return oldTask != null ? ScheduleResult.SUCCESS_SUPERSEDED_PREVIOUS : ScheduleResult.SUCCESS;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CancelResult cancel() throws IgniteCheckedException {
+ final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry();
+
+ if (!maintenanceRegistry.isMaintenanceMode()) {
+ boolean deleted = maintenanceRegistry.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+ return deleted ? CancelResult.CANCELLED_SCHEDULED : CancelResult.SCHEDULED_NOT_FOUND;
+ }
+ else {
+ List<MaintenanceAction<?>> actions;
+
+ try {
+ actions = maintenanceRegistry.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+ }
+ catch (IgniteException e) {
+ return CancelResult.COMPLETED_OR_CANCELLED;
+ }
+
+ Optional<MaintenanceAction<?>> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny();
+
+ assert stopAct.isPresent();
+
+ try {
+ Object res = stopAct.get().execute();
+
+ assert res instanceof Boolean;
+
+ boolean cancelled = (Boolean)res;
+
+ return cancelled ? CancelResult.CANCELLED : CancelResult.COMPLETED_OR_CANCELLED;
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException("Exception occurred: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public DefragmentationStatus status() throws IgniteCheckedException {
+ final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry();
+
+ if (!maintenanceRegistry.isMaintenanceMode())
+ throw new IgniteCheckedException("Node is not in maintenance mode.");
+
+ IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database();
+
+ assert dbMgr instanceof GridCacheDatabaseSharedManager;
+
+ CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr)
+ .defragmentationManager();
+
+ if (defrgMgr == null)
+ throw new IgniteCheckedException("There's no active defragmentation process on the node.");
+
+ final Status status = defrgMgr.status();
+
+ final long startTs = status.getStartTs();
+ final long finishTs = status.getFinishTs();
+ final long elapsedTime = finishTs != 0 ? finishTs - startTs : System.currentTimeMillis() - startTs;
+
+ Map<String, CompletedDefragmentationInfo> completedCaches = new HashMap<>();
+ Map<String, InProgressDefragmentationInfo> progressCaches = new HashMap<>();
+
+ status.getFinishedGroups().forEach((context, progress) -> {
+ final String name = context.cacheOrGroupName();
+
+ final long oldSize = progress.getOldSize();
+ final long newSize = progress.getNewSize();
+ final long cgElapsedTime = progress.getFinishTs() - progress.getStartTs();
+
+ final CompletedDefragmentationInfo info = new CompletedDefragmentationInfo(cgElapsedTime, oldSize, newSize);
+ completedCaches.put(name, info);
+ });
+
+ status.getProgressGroups().forEach((context, progress) -> {
+ final String name = context.cacheOrGroupName();
+
+ final long cgElapsedTime = System.currentTimeMillis() - progress.getStartTs();
+ final int partsTotal = progress.getPartsTotal();
+ final int partsCompleted = progress.getPartsCompleted();
+
+ final InProgressDefragmentationInfo info = new InProgressDefragmentationInfo(cgElapsedTime, partsCompleted, partsTotal);
+ progressCaches.put(name, info);
+ });
+
+ return new DefragmentationStatus(
+ completedCaches,
+ progressCaches,
+ status.getScheduledGroups(),
+ status.getSkippedGroups(),
+ status.getTotalPartitionCount(),
+ status.getDefragmentedPartitionCount(),
+ startTs,
+ elapsedTime
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean inProgress() {
+ final Status status = getStatus();
+
+ return status != null && status.getFinishTs() == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int processedPartitions() {
+ final Status status = getStatus();
+
+ if (status == null)
+ return 0;
+
+ return status.getDefragmentedPartitionCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int totalPartitions() {
+ final CachePartitionDefragmentationManager.Status status = getStatus();
+
+ if (status == null)
+ return 0;
+
+ return status.getTotalPartitionCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ final CachePartitionDefragmentationManager.Status status = getStatus();
+
+ if (status == null)
+ return 0;
+
+ return status.getStartTs();
+ }
+
+ /**
+ * Get defragmentation status.
+ * @return Defragmentation status or {@code null} if there is no ongoing defragmentation.
+ */
+ private Status getStatus() {
+ final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry();
+
+ if (!maintenanceRegistry.isMaintenanceMode())
+ return null;
+
+ IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database();
+
+ assert dbMgr instanceof GridCacheDatabaseSharedManager;
+
+ CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager) dbMgr)
+ .defragmentationManager();
+
+ if (defrgMgr == null)
+ return null;
+
+ return defrgMgr.status();
+ }
+
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 958053b..ad09571 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.authentication.IgniteAuthentication
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
@@ -492,6 +493,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
}
/** {@inheritDoc} */
+ @Override public IgniteDefragmentation defragmentation() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public WorkersRegistry workersRegistry() {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java
index 14cea62..88fde8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java
@@ -17,27 +17,17 @@
package org.apache.ignite.internal.visor.defragmentation;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.task.GridVisorManagementTask;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
-import org.apache.ignite.maintenance.MaintenanceAction;
-import org.apache.ignite.maintenance.MaintenanceRegistry;
-import org.apache.ignite.maintenance.MaintenanceTask;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME;
-import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore;
-
/** */
@GridInternal
@GridVisorManagementTask
@@ -120,91 +110,71 @@ public class VisorDefragmentationTask extends VisorMultiNodeTask
/** */
private VisorDefragmentationTaskResult runSchedule(VisorDefragmentationTaskArg arg) {
- MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
+ final IgniteDefragmentation defragmentation = ignite.context().defragmentation();
- MaintenanceTask oldTask;
+ final IgniteDefragmentation.ScheduleResult scheduleResult;
try {
- List<String> cacheNames = arg.cacheNames();
-
- oldTask = mntcReg.registerMaintenanceTask(toStore(cacheNames == null ? Collections.emptyList() : cacheNames));
+ scheduleResult = defragmentation.schedule(arg.cacheNames());
}
catch (IgniteCheckedException e) {
- return new VisorDefragmentationTaskResult(false, "Scheduling failed: " + e.getMessage());
+ return new VisorDefragmentationTaskResult(false, e.getMessage());
}
- return new VisorDefragmentationTaskResult(
- true,
- "Scheduling completed successfully." +
- (oldTask == null ? "" : " Previously scheduled task has been removed.")
- );
+ String message;
+
+ switch (scheduleResult) {
+ case SUCCESS_SUPERSEDED_PREVIOUS:
+ message = "Scheduling completed successfully. Previously scheduled task has been removed.";
+ break;
+ case SUCCESS:
+ default:
+ message = "Scheduling completed successfully.";
+ break;
+ }
+
+ return new VisorDefragmentationTaskResult(true, message);
}
/** */
private VisorDefragmentationTaskResult runStatus(VisorDefragmentationTaskArg arg) {
- MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
-
- if (!mntcReg.isMaintenanceMode())
- return new VisorDefragmentationTaskResult(false, "Node is not in maintenance node.");
-
- IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database();
+ final IgniteDefragmentation defragmentation = ignite.context().defragmentation();
- assert dbMgr instanceof GridCacheDatabaseSharedManager;
-
- CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr)
- .defragmentationManager();
-
- if (defrgMgr == null)
- return new VisorDefragmentationTaskResult(true, "There's no active defragmentation process on the node.");
-
- return new VisorDefragmentationTaskResult(true, defrgMgr.status());
+ try {
+ return new VisorDefragmentationTaskResult(true, defragmentation.status().toString());
+ } catch (IgniteCheckedException e) {
+ return new VisorDefragmentationTaskResult(false, e.getMessage());
+ }
}
/** */
private VisorDefragmentationTaskResult runCancel(VisorDefragmentationTaskArg arg) {
- assert arg.cacheNames() == null : "Cancelling specific caches is not yet implemented";
-
- MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry();
-
- if (!mntcReg.isMaintenanceMode()) {
- boolean deleted = mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+ final IgniteDefragmentation defragmentation = ignite.context().defragmentation();
- String msg = deleted
- ? "Scheduled defragmentation task cancelled successfully."
- : "Scheduled defragmentation task is not found.";
-
- return new VisorDefragmentationTaskResult(true, msg);
- }
- else {
- List<MaintenanceAction<?>> actions;
-
- try {
- actions = mntcReg.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
- }
- catch (IgniteException e) {
- return new VisorDefragmentationTaskResult(true, "Defragmentation is already completed or has been cancelled previously.");
+ try {
+ final IgniteDefragmentation.CancelResult cancelResult = defragmentation.cancel();
+
+ String message;
+
+ switch (cancelResult) {
+ case SCHEDULED_NOT_FOUND:
+ message = "Scheduled defragmentation task is not found.";
+ break;
+ case CANCELLED:
+ message = "Defragmentation cancelled successfully.";
+ break;
+ case COMPLETED_OR_CANCELLED:
+ message = "Defragmentation is already completed or has been cancelled previously.";
+ break;
+ case CANCELLED_SCHEDULED:
+ default:
+ message = "Scheduled defragmentation task cancelled successfully.";
+ break;
}
- Optional<MaintenanceAction<?>> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny();
-
- assert stopAct.isPresent();
-
- try {
- Object res = stopAct.get().execute();
-
- assert res instanceof Boolean;
-
- boolean cancelled = (Boolean)res;
-
- String msg = cancelled
- ? "Defragmentation cancelled successfully."
- : "Defragmentation is already completed or has been cancelled previously.";
-
- return new VisorDefragmentationTaskResult(true, msg);
- }
- catch (Exception e) {
- return new VisorDefragmentationTaskResult(false, "Exception occurred: " + e.getMessage());
- }
+ return new VisorDefragmentationTaskResult(true, message);
+ } catch (IgniteCheckedException e) {
+ return new VisorDefragmentationTaskResult(false, e.getMessage());
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java
index 1b1c8b1..9e6ec53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java
@@ -34,9 +34,6 @@ public class VisorDefragmentationTaskArg extends IgniteDataTransferObject {
private VisorDefragmentationOperation operation;
/** */
- private List<String> nodeIds;
-
- /** */
private List<String> cacheNames;
/** Empty constructor for serialization. */
@@ -47,12 +44,10 @@ public class VisorDefragmentationTaskArg extends IgniteDataTransferObject {
/** */
public VisorDefragmentationTaskArg(
VisorDefragmentationOperation operation,
- List<String> nodeIds,
List<String> cacheNames
) {
this.operation = operation;
- this.nodeIds = nodeIds;
this.cacheNames = cacheNames;
}
@@ -62,11 +57,6 @@ public class VisorDefragmentationTaskArg extends IgniteDataTransferObject {
}
/** */
- public List<String> nodeIds() {
- return nodeIds;
- }
-
- /** */
public List<String> cacheNames() {
return cacheNames;
}
@@ -75,8 +65,6 @@ public class VisorDefragmentationTaskArg extends IgniteDataTransferObject {
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
U.writeEnum(out, operation);
- U.writeCollection(out, nodeIds);
-
U.writeCollection(out, cacheNames);
}
@@ -84,8 +72,6 @@ public class VisorDefragmentationTaskArg extends IgniteDataTransferObject {
@Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
operation = U.readEnum(in, VisorDefragmentationOperation.class);
- nodeIds = U.readList(in);
-
cacheNames = U.readList(in);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java
new file mode 100644
index 0000000..22a5e2d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+/**
+ * JMX bean for defragmentation manager.
+ */
+@MXBeanDescription("MBean that provides access for defragmentation features.")
+public interface DefragmentationMXBean {
+ /**
+ * Schedule defragmentation for given caches.
+ *
+ * @param cacheNames Names of caches to run defragmentation on, comma separated.
+ * @return {@code true} if defragmentation is scheduled, {@code false} otherwise.
+ */
+ @MXBeanDescription("Schedule defragmentation.")
+ public boolean schedule(@MXBeanParameter(name = "cacheNames", description = "Names of caches to run defragmentation on.") String cacheNames);
+
+ /**
+ * Cancel defragmentation.
+ *
+ * @return {@code true} if defragmentation was canceled, {@code false} otherwise.
+ */
+ @MXBeanDescription("Cancel current defragmentation.")
+ public boolean cancel();
+
+ /**
+ * Get defragmentation status.
+ *
+ * @return {@code true} if defragmentation is in progress right now.
+ */
+ @MXBeanDescription("Cancel current defragmentation.")
+ public boolean inProgress();
+
+ /**
+ * Get count of processed partitions.
+ *
+ * @return {@code true} if defragmentation is in progress right now.
+ */
+ @MXBeanDescription("Processed partitions.")
+ public int processedPartitions();
+
+ /**
+ * Get total count of partitions.
+ *
+ * @return {@code true} if defragmentation is in progress right now.
+ */
+ @MXBeanDescription("Total partitions.")
+ public int totalPartitions();
+
+ /**
+ * Get defragmentation's start time.
+ *
+ * @return {@code true} if defragmentation is in progress right now.
+ */
+ @MXBeanDescription("Start time.")
+ public long startTime();
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java
new file mode 100644
index 0000000..f1e5c77
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.UnaryOperator;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.maintenance.MaintenanceTask;
+import org.apache.ignite.mxbean.DefragmentationMXBean;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/**
+ * Tests for defragmentation JMX bean.
+ */
+public class DefragmentationMXBeanTest extends GridCommonAbstractTest {
+ /** */
+ private static CountDownLatch blockCdl;
+
+ /** */
+ private static CountDownLatch waitCdl;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ final DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+
+ dsCfg.setWalSegmentSize(512 * 1024).setWalSegments(3);
+ dsCfg.setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(50L * 1024 * 1024).setPersistenceEnabled(true)
+ );
+
+ return cfg.setDataStorageConfiguration(dsCfg);
+ }
+
+ /**
+ * Test that defragmentation won't be scheduled second time, if previously scheduled via maintenance registry.
+ * Description:
+ * 1. Start two nodes.
+ * 2. Register defragmentation maintenance task on the first node.
+ * 3. Restart node.
+ * 3. Scheduling of the defragmentation on the first node via JMX bean should fail.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDefragmentationSchedule() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.cluster().state(ACTIVE);
+
+ DefragmentationMXBean mxBean = defragmentationMXBean(ignite.name());
+
+ assertTrue(mxBean.schedule(""));
+
+ MaintenanceTask mntcTask = DefragmentationParameters.toStore(Collections.emptyList());
+
+ assertNotNull(grid(0).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
+ assertNull(grid(1).context().maintenanceRegistry().registerMaintenanceTask(mntcTask));
+
+ stopGrid(0);
+ startGrid(0);
+
+ // node is already in defragmentation mode, hence scheduling is not possible
+ assertFalse(mxBean.schedule(""));
+ }
+
+ /**
+ * Test that defragmentation can be successfuly cancelled via JMX bean.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDefragmentationCancel() throws Exception {
+ Ignite ignite = startGrids(2);
+
+ ignite.cluster().state(ACTIVE);
+
+ DefragmentationMXBean mxBean = defragmentationMXBean(ignite.name());
+
+ mxBean.schedule("");
+
+ assertTrue(mxBean.cancel());
+
+ // subsequent cancel call should be successful
+ assertTrue(mxBean.cancel());
+ }
+
+ /**
+ * Test that ongong defragmentation can be stopped via JMX bean.
+ * Description:
+ * 1. Start one node.
+ * 2. Put a load of a data on it.
+ * 3. Schedule defragmentation.
+ * 4. Make IO factory slow down after 128 partitions are processed, so we have time to stop the defragmentation.
+ * 5. Stop the defragmentation.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDefragmentationCancelInProgress() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, i);
+
+ forceCheckpoint(ig);
+
+ DefragmentationMXBean mxBean = defragmentationMXBean(ig.name());
+
+ mxBean.schedule("");
+
+ stopGrid(0);
+
+ blockCdl = new CountDownLatch(128);
+
+ UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
+ DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+
+ FileIOFactory delegate = dsCfg.getFileIOFactory();
+
+ dsCfg.setFileIOFactory((file, modes) -> {
+ if (file.getName().contains("dfrg")) {
+ if (blockCdl.getCount() == 0) {
+ try {
+ // Slow down defragmentation process.
+ // This'll be enough for the test since we have, like, 900 partitions left.
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+ }
+ else
+ blockCdl.countDown();
+ }
+
+ return delegate.create(file, modes);
+ });
+
+ return cfg;
+ };
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try {
+ startGrid(0, cfgOp);
+ }
+ catch (Exception e) {
+ // No-op.
+ throw new RuntimeException(e);
+ }
+ });
+
+ blockCdl.await();
+
+ mxBean = defragmentationMXBean(ig.name());
+
+ assertTrue(mxBean.cancel());
+
+ fut.get();
+
+ assertTrue(mxBean.cancel());
+ }
+
+ /**
+ * Test that JMX bean provides correct defragmentation status.
+ * Description:
+ * 1. Start one node,
+ * 2. Put a load of data on it.
+ * 3. Schedule defragmentation.
+ * 4. Completely stop defragmentation when 128 partitions processed.
+ * 5. Check defragmentation status.
+ * 6. Continue defragmentation and wait for it to end.
+ * 7. Check defragmentation finished.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDefragmentationStatus() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ ig.getOrCreateCache(DEFAULT_CACHE_NAME + "1");
+
+ IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME + "2");
+
+ ig.getOrCreateCache(DEFAULT_CACHE_NAME + "3");
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, i);
+
+ forceCheckpoint(ig);
+
+ DefragmentationMXBean mxBean = defragmentationMXBean(ig.name());
+
+ mxBean.schedule("");
+
+ stopGrid(0);
+
+ blockCdl = new CountDownLatch(128);
+ waitCdl = new CountDownLatch(1);
+
+ UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
+ DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+
+ FileIOFactory delegate = dsCfg.getFileIOFactory();
+
+ dsCfg.setFileIOFactory((file, modes) -> {
+ if (file.getName().contains("dfrg")) {
+ if (blockCdl.getCount() == 0) {
+ try {
+ waitCdl.await();
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+ }
+ else
+ blockCdl.countDown();
+ }
+
+ return delegate.create(file, modes);
+ });
+
+ return cfg;
+ };
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try {
+ startGrid(0, cfgOp);
+ }
+ catch (Exception e) {
+ // No-op.
+ throw new RuntimeException(e);
+ }
+ });
+
+ blockCdl.await();
+
+ mxBean = defragmentationMXBean(ig.name());
+
+ final IgniteKernal gridx = IgnitionEx.gridx(ig.name());
+ final IgniteDefragmentation defragmentation = gridx.context().defragmentation();
+ final IgniteDefragmentation.DefragmentationStatus status1 = defragmentation.status();
+
+ assertEquals(status1.getStartTs(), mxBean.startTime());
+
+ assertTrue(mxBean.inProgress());
+ assertEquals(126, mxBean.processedPartitions());
+ final int totalPartitions = status1.getTotalPartitions();
+ assertEquals(totalPartitions, mxBean.totalPartitions());
+
+ waitCdl.countDown();
+
+ fut.get();
+
+ ((GridCacheDatabaseSharedManager) grid(0).context().cache().context().database())
+ .defragmentationManager()
+ .completionFuture()
+ .get();
+
+ assertFalse(mxBean.inProgress());
+ assertEquals(totalPartitions, mxBean.processedPartitions());
+ }
+
+ /**
+ * Get defragmentation JMX bean.
+ * @param name Ignite instance name.
+ * @return Defragmentation JMX bean.
+ */
+ private DefragmentationMXBean defragmentationMXBean(String name) {
+ return getMxBean(
+ name,
+ "Defragmentation",
+ DefragmentationMXBeanImpl.class,
+ DefragmentationMXBean.class
+ );
+ }
+
+}