You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/09 07:06:03 UTC
[ignite] branch master updated: IGNITE-13709 Control.sh API -
status command for defragmentation feature - Fixes #8548.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 493759a IGNITE-13709 Control.sh API - status command for defragmentation feature - Fixes #8548.
493759a is described below
commit 493759a5aa43d2a026b56552cf26cd72e0ed96a9
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Dec 9 09:54:58 2020 +0300
IGNITE-13709 Control.sh API - status command for defragmentation feature - Fixes #8548.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../commandline/DefragmentationCommand.java | 13 +-
.../GridCommandHandlerDefragmentationTest.java | 141 ++++-
.../CachePartitionDefragmentationManager.java | 577 +++++++++++++++------
3 files changed, 564 insertions(+), 167 deletions(-)
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
index c2fa8e9..ec5c1f0 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java
@@ -111,7 +111,7 @@ public class DefragmentationCommand implements Command<DefragmentationArguments>
@Override public void parseArguments(CommandArgIterator argIter) {
DefragmentationSubcommands cmd = DefragmentationSubcommands.of(argIter.nextArg("Expected defragmentation subcommand."));
- if (cmd == null || cmd == DefragmentationSubcommands.STATUS) // Status subcommand is not yet completed.
+ if (cmd == null)
throw new IllegalArgumentException("Expected correct defragmentation subcommand.");
args = new DefragmentationArguments(cmd);
@@ -124,10 +124,17 @@ public class DefragmentationCommand implements Command<DefragmentationArguments>
String subarg;
do {
- subarg = argIter.nextArg("Expected one of subcommand arguments.").toLowerCase(Locale.ENGLISH);
+ subarg = argIter.peekNextArg();
+
+ if (subarg == null)
+ break;
+
+ subarg = subarg.toLowerCase(Locale.ENGLISH);
switch (subarg) {
case NODES_ARG: {
+ argIter.nextArg("");
+
Set<String> ids = argIter.nextStringSet(NODES_ARG);
if (ids.isEmpty())
@@ -139,6 +146,8 @@ public class DefragmentationCommand implements Command<DefragmentationArguments>
}
case CACHES_ARG: {
+ argIter.nextArg("");
+
Set<String> ids = argIter.nextStringSet(CACHES_ARG);
if (ids.isEmpty())
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
index adce9e2..c2ea3c2 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java
@@ -17,13 +17,16 @@
package org.apache.ignite.util;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.UnaryOperator;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.StreamHandler;
+import java.util.regex.Pattern;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cluster.ClusterState;
@@ -33,6 +36,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.commandline.CommandHandler;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.maintenance.MaintenanceTask;
@@ -50,6 +54,9 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu
/** */
private static CountDownLatch blockCdl;
+ /** */
+ private static CountDownLatch waitCdl;
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
@@ -205,7 +212,9 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu
assertTrue(logLsnr.check());
}
- /** */
+ /**
+ * @throws Exception If failed.
+ */
@Test
public void testDefragmentationCancelInProgress() throws Exception {
IgniteEx ig = startGrid(0);
@@ -311,6 +320,136 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu
assertTrue(logLsnr.check());
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDefragmentationStatus() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ ig.getOrCreateCache(DEFAULT_CACHE_NAME + "1");
+
+ IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME + "2");
+
+ ig.getOrCreateCache(DEFAULT_CACHE_NAME + "3");
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, i);
+
+ forceCheckpoint(ig);
+
+ String grid0ConsId = ig.configuration().getConsistentId().toString();
+
+ ListeningTestLogger testLog = new ListeningTestLogger();
+
+ CommandHandler cmd = createCommandHandler(testLog);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--defragmentation",
+ "schedule",
+ "--nodes",
+ grid0ConsId
+ ));
+
+ String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString();
+
+ stopGrid(0);
+
+ blockCdl = new CountDownLatch(128);
+ waitCdl = new CountDownLatch(1);
+
+ UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
+ DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+
+ FileIOFactory delegate = dsCfg.getFileIOFactory();
+
+ dsCfg.setFileIOFactory((file, modes) -> {
+ if (file.getName().contains("dfrg")) {
+ if (blockCdl.getCount() == 0) {
+ try {
+ waitCdl.await();
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+ }
+ else
+ blockCdl.countDown();
+ }
+
+ return delegate.create(file, modes);
+ });
+
+ return cfg;
+ };
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try {
+ startGrid(0, cfgOp);
+ }
+ catch (Exception e) {
+ // No-op.
+ throw new RuntimeException(e);
+ }
+ });
+
+ blockCdl.await();
+
+ List<LogListener> logLsnrs = Arrays.asList(
+ LogListener.matches("default1 - size before/after: 0MB/0MB").build(),
+ LogListener.matches("default2 - partitions processed/all:").build(),
+ LogListener.matches("Awaiting defragmentation: default3").build()
+ );
+
+ for (LogListener logLsnr : logLsnrs)
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--port",
+ port,
+ "--defragmentation",
+ "status"
+ ));
+
+ waitCdl.countDown();
+
+ for (LogListener logLsnr : logLsnrs)
+ assertTrue(logLsnr.check());
+
+ fut.get();
+
+ ((GridCacheDatabaseSharedManager)grid(0).context().cache().context().database())
+ .defragmentationManager()
+ .completionFuture()
+ .get();
+
+ testLog.clearListeners();
+
+ logLsnrs = Arrays.asList(
+ LogListener.matches("default1 - size before/after: 0MB/0MB").build(),
+ LogListener.matches(Pattern.compile("default2 - size before/after: (\\S+)/\\1")).build(),
+ LogListener.matches("default3 - size before/after: 0MB/0MB").build()
+ );
+
+ for (LogListener logLsnr : logLsnrs)
+ testLog.registerListener(logLsnr);
+
+ assertEquals(EXIT_CODE_OK, execute(
+ cmd,
+ "--port",
+ port,
+ "--defragmentation",
+ "status"
+ ));
+
+ for (LogListener logLsnr : logLsnrs)
+ assertTrue(logLsnr.check());
+ }
+
/** */
private CommandHandler createCommandHandler(ListeningTestLogger testLog) {
Logger log = CommandHandler.initLogger(null);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index 41999fb..75b3458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -19,11 +19,16 @@ package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
import java.io.File;
import java.nio.file.Path;
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.PendingRow;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.collection.IntHashMap;
import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -78,9 +84,11 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.maintenance.MaintenanceRegistry;
+import static java.util.Comparator.comparing;
import static java.util.stream.StreamSupport.stream;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
@@ -105,6 +113,9 @@ public class CachePartitionDefragmentationManager {
/** */
private final Set<String> cachesForDefragmentation;
+ /** */
+ private final Set<CacheGroupContext> cacheGrpCtxsForDefragmentation = new TreeSet<>(comparing(CacheGroupContext::cacheOrGroupName));
+
/** Cache shared context. */
private final GridCacheSharedContext<?, ?> sharedCtx;
@@ -142,6 +153,9 @@ public class CachePartitionDefragmentationManager {
private final AtomicBoolean cancel = new AtomicBoolean();
/** */
+ private final DefragmentationStatus status = new DefragmentationStatus();
+
+ /** */
private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();
/**
@@ -190,11 +204,23 @@ public class CachePartitionDefragmentationManager {
dbMgr.preserveWalTailPointer();
sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+
+ for (CacheGroupContext oldGrpCtx : sharedCtx.cache().cacheGroups()) {
+ if (!oldGrpCtx.userCache() || cacheGrpCtxsForDefragmentation.contains(oldGrpCtx))
+ continue;
+
+ if (!cachesForDefragmentation.isEmpty()) {
+ if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name())))
+ continue;
+ }
+
+ cacheGrpCtxsForDefragmentation.add(oldGrpCtx);
+ }
}
/** */
public void executeDefragmentation() throws IgniteCheckedException {
- log.info("Defragmentation started.");
+ status.onStart(cacheGrpCtxsForDefragmentation);
try {
// Now the actual process starts.
@@ -203,23 +229,18 @@ public class CachePartitionDefragmentationManager {
IgniteInternalFuture<?> idxDfrgFut = null;
DataPageEvictionMode prevPageEvictionMode = null;
- for (CacheGroupContext oldGrpCtx : sharedCtx.cache().cacheGroups()) {
- if (!oldGrpCtx.userCache())
- continue;
-
+ for (CacheGroupContext oldGrpCtx : cacheGrpCtxsForDefragmentation) {
int grpId = oldGrpCtx.groupId();
- if (!cachesForDefragmentation.isEmpty()) {
- if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name())))
- continue;
- }
-
File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName());
- try {
- if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
- continue;
+ if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) {
+ status.onCacheGroupSkipped(oldGrpCtx);
+ continue;
+ }
+
+ try {
GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
@@ -233,226 +254,243 @@ public class CachePartitionDefragmentationManager {
})
.collect(Collectors.toList());
- if (workDir != null && !oldCacheDataStores.isEmpty()) {
- // We can't start defragmentation of new group on the region that has wrong eviction mode.
- // So waiting of the previous cache group defragmentation is inevitable.
- DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
+ status.onCacheGroupStart(oldGrpCtx, oldCacheDataStores.size());
- if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
- prevPageEvictionMode = curPageEvictionMode;
+ if (workDir == null || oldCacheDataStores.isEmpty()) {
+ status.onCacheGroupFinish(oldGrpCtx);
- partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+ continue;
+ }
- if (idxDfrgFut != null)
- idxDfrgFut.get();
- }
+ // We can't start defragmentation of new group on the region that has wrong eviction mode.
+ // So waiting of the previous cache group defragmentation is inevitable.
+ DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
- IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
+ if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
+ prevPageEvictionMode = curPageEvictionMode;
- for (CacheDataStore store : offheap.cacheDataStores()) {
- // Tree can be null for not yet initialized partitions.
- // This would mean that these partitions are empty.
- assert store.tree() == null || store.tree().groupId() == grpId;
+ partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
- if (store.tree() != null)
- cacheDataStores.put(store.partId(), store);
- }
+ if (idxDfrgFut != null)
+ idxDfrgFut.get();
+ }
- dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+ IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
- // Another cheat. Ttl cleanup manager knows too much shit.
- oldGrpCtx.caches().stream()
- .filter(cacheCtx -> cacheCtx.groupId() == grpId)
- .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+ for (CacheDataStore store : offheap.cacheDataStores()) {
+ // Tree can be null for not yet initialized partitions.
+ // This would mean that these partitions are empty.
+ assert store.tree() == null || store.tree().groupId() == grpId;
- // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care
- // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
- // cache group explicitly.
- oldGrpCtx.localWalEnabled(false, false);
+ if (store.tree() != null)
+ cacheDataStores.put(store.partId(), store);
+ }
- boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
+ dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
- FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+ // Another cheat. Ttl cleanup manager knows too much shit.
+ oldGrpCtx.caches().stream()
+ .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+ .forEach(cacheCtx -> cacheCtx.ttl().unregister());
- createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> {
- }); //TODO Allocated tracker.
+ // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care
+ // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
+ // cache group explicitly.
+ oldGrpCtx.localWalEnabled(false, false);
- checkCancellation();
+ boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
- GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
+ FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
- PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
+ AtomicLong idxAllocationTracker = new GridAtomicLong();
- CacheGroupContext newGrpCtx = new CacheGroupContext(
- sharedCtx,
- grpId,
- oldGrpCtx.receivedFrom(),
- CacheType.USER,
- oldGrpCtx.config(),
- oldGrpCtx.affinityNode(),
- partDataRegion,
- oldGrpCtx.cacheObjectContext(),
- null,
- null,
- oldGrpCtx.localStartVersion(),
- true,
- false,
- true
- );
+ createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, idxAllocationTracker::addAndGet);
- defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+ checkCancellation();
- try {
- // This will initialize partition meta in index partition - meta tree and reuse list.
- newGrpCtx.start();
- }
- finally {
- defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
- }
+ GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
- IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
+ PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
- for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
- checkCancellation();
+ CacheGroupContext newGrpCtx = new CacheGroupContext(
+ sharedCtx,
+ grpId,
+ oldGrpCtx.receivedFrom(),
+ CacheType.USER,
+ oldGrpCtx.config(),
+ oldGrpCtx.affinityNode(),
+ partDataRegion,
+ oldGrpCtx.cacheObjectContext(),
+ null,
+ null,
+ oldGrpCtx.localStartVersion(),
+ true,
+ false,
+ true
+ );
- int partId = oldCacheDataStore.partId();
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
- PartitionContext partCtx = new PartitionContext(
- workDir,
- grpId,
- partId,
- partDataRegion,
- mappingDataRegion,
- oldGrpCtx,
- newGrpCtx,
- cacheDataStores.get(partId),
- pageStoreFactory
- );
+ try {
+ // This will initialize partition meta in index partition - meta tree and reuse list.
+ newGrpCtx.start();
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
- if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
- partCtx.createPageStore(
- () -> defragmentedPartMappingFile(workDir, partId).toPath(),
- partCtx.mappingPagesAllocated,
- partCtx.mappingPageMemory
- );
+ IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
- linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+ for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
+ checkCancellation();
- continue;
- }
+ int partId = oldCacheDataStore.partId();
+
+ PartitionContext partCtx = new PartitionContext(
+ workDir,
+ grpId,
+ partId,
+ partDataRegion,
+ mappingDataRegion,
+ oldGrpCtx,
+ newGrpCtx,
+ cacheDataStores.get(partId),
+ pageStoreFactory
+ );
+ if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
partCtx.createPageStore(
() -> defragmentedPartMappingFile(workDir, partId).toPath(),
partCtx.mappingPagesAllocated,
partCtx.mappingPageMemory
);
- linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
- checkCancellation();
+ continue;
+ }
- partCtx.createPageStore(
- () -> defragmentedPartTmpFile(workDir, partId).toPath(),
- partCtx.partPagesAllocated,
- partCtx.partPageMemory
- );
+ partCtx.createPageStore(
+ () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+ partCtx.mappingPagesAllocated,
+ partCtx.mappingPageMemory
+ );
+
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+
+ checkCancellation();
+
+ partCtx.createPageStore(
+ () -> defragmentedPartTmpFile(workDir, partId).toPath(),
+ partCtx.partPagesAllocated,
+ partCtx.partPageMemory
+ );
partCtx.createNewCacheDataStore(offheap);
copyPartitionData(partCtx, treeIter);
- IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
- if (fut.error() != null)
- return;
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
- PageStore oldPageStore = null;
+ PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);
- try {
- oldPageStore = filePageStoreMgr.getStore(grpId, partId);
- }
- catch (IgniteCheckedException ignore) {
- }
+ status.onPartitionDefragmented(
+ oldGrpCtx,
+ oldPageStore.size(),
+ pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
+ );
- assert oldPageStore != null;
+ //TODO Move inside of defragmentSinglePartition.
+ IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
+ if (fut.error() != null)
+ return;
+
+ if (log.isDebugEnabled()) {
+ log.debug(S.toString(
+ "Partition defragmented",
+ "grpId", grpId, false,
+ "partId", partId, false,
+ "oldPages", oldPageStore.pages(), false,
+ "newPages", partCtx.partPagesAllocated.get() + 1, false,
+ "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
+ "pageSize", pageSize, false,
+ "partFile", defragmentedPartFile(workDir, partId).getName(), false,
+ "workDir", workDir, false
+ ));
+ }
- if (log.isDebugEnabled()) {
- log.debug(S.toString(
- "Partition defragmented",
- "grpId", grpId, false,
- "partId", partId, false,
- "oldPages", oldPageStore.pages(), false,
- "newPages", partCtx.partPagesAllocated.get() + 1, false,
- "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
- "pageSize", pageSize, false,
- "partFile", defragmentedPartFile(workDir, partId).getName(), false,
- "workDir", workDir, false
- ));
- }
+ oldPageMem.invalidate(grpId, partId);
- oldPageMem.invalidate(grpId, partId);
+ partCtx.partPageMemory.invalidate(grpId, partId);
- partCtx.partPageMemory.invalidate(grpId, partId);
+ pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
- DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+ renameTempPartitionFile(workDir, partId);
+ };
- pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
+ GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+ .forceCheckpoint("partition defragmented", null)
+ .futureFor(CheckpointState.FINISHED);
- renameTempPartitionFile(workDir, partId);
- };
+ cpFut.listen(cpLsnr);
- GridFutureAdapter<?> cpFut = defragmentationCheckpoint
- .forceCheckpoint("partition defragmented", null)
- .futureFor(CheckpointState.FINISHED);
+ cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+ }
- cpFut.listen(cpLsnr);
+ // A bit too general for now, but I like it more then saving only the last checkpoint future.
+ cmpFut.markInitialized().get();
- cmpFut.add((IgniteInternalFuture<Object>)cpFut);
- }
+ idxDfrgFut = new GridFinishedFuture<>();
- // A bit too general for now, but I like it more then saving only the last checkpoint future.
- cmpFut.markInitialized().get();
+ if (filePageStoreMgr.hasIndexStore(grpId)) {
+ defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
- idxDfrgFut = new GridFinishedFuture<>();
+ idxDfrgFut = defragmentationCheckpoint
+ .forceCheckpoint("index defragmented", null)
+ .futureFor(CheckpointState.FINISHED);
+ }
- if (filePageStoreMgr.hasIndexStore(grpId)) {
- defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
+ idxDfrgFut = idxDfrgFut.chain(fut -> {
+ oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
- idxDfrgFut = defragmentationCheckpoint
- .forceCheckpoint("index defragmented", null)
- .futureFor(CheckpointState.FINISHED);
- }
+ PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
- idxDfrgFut = idxDfrgFut.chain(fut -> {
- oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+ partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
- PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
- partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+ pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
- DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
+ PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
- pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
+ pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
- PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
+ pageMgr.pageStoreMap().clear(grpId);
- pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
+ renameTempIndexFile(workDir);
- pageMgr.pageStoreMap().clear(grpId);
+ writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
- renameTempIndexFile(workDir);
+ batchRenameDefragmentedCacheGroupPartitions(workDir, log);
- writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
+ return null;
+ });
- batchRenameDefragmentedCacheGroupPartitions(workDir, log);
- return null;
- });
- }
+ PageStore oldIdxPageStore = filePageStoreMgr.getStore(grpId, INDEX_PARTITION);
+
+ status.onIndexDefragmented(
+ oldGrpCtx,
+ oldIdxPageStore.size(),
+ pageSize + idxAllocationTracker.get() * pageSize // + file header.
+ );
}
catch (DefragmentationCancelledException e) {
DefragmentationFileUtils.deleteLeftovers(workDir);
throw e;
}
+
+ status.onCacheGroupFinish(oldGrpCtx);
}
if (idxDfrgFut != null)
@@ -460,18 +498,24 @@ public class CachePartitionDefragmentationManager {
mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
- log.info("Defragmentation completed. All partitions are defragmented.");
+ status.onFinish();
completionFut.onDone();
}
catch (DefragmentationCancelledException e) {
mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
- log.info("Defragmentation has been cancelled.");
+ log.info("Defragmentation process has been cancelled.");
+
+ status.onFinish();
completionFut.onDone();
}
catch (Throwable t) {
+ log.error("Defragmentation process failed.", t);
+
+ status.onFinish();
+
completionFut.onDone(t);
throw t;
@@ -553,7 +597,7 @@ public class CachePartitionDefragmentationManager {
/** */
public String status() {
- throw new UnsupportedOperationException("Not implemented yet.");
+ return status.toString();
}
/**
@@ -918,4 +962,209 @@ public class CachePartitionDefragmentationManager {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
}
+
+ /** */
+ private class DefragmentationStatus {
+ /** */
+ private long startTs;
+
+ /** */
+ private long finishTs;
+
+ /** */
+ private final Set<String> scheduledGroups = new TreeSet<>();
+
+ /** */
+ private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> progressGroups
+ = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+
+ /** */
+ private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> finishedGroups
+ = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName));
+
+ /** */
+ private final Set<String> skippedGroups = new TreeSet<>();
+
+ /** */
+ public synchronized void onStart(Set<CacheGroupContext> scheduledGroups) {
+ startTs = System.currentTimeMillis();
+
+ for (CacheGroupContext grp : scheduledGroups) {
+ this.scheduledGroups.add(grp.cacheOrGroupName());
+ }
+
+ log.info("Defragmentation started.");
+ }
+
+ /** */
+ public synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) {
+ scheduledGroups.remove(grpCtx.cacheOrGroupName());
+
+ progressGroups.put(grpCtx, new DefragmentationCacheGroupProgress(parts));
+ }
+
+ /** */
+ public synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
+ progressGroups.get(grpCtx).onPartitionDefragmented(oldSize, newSize);
+ }
+
+ /** */
+ public synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) {
+ progressGroups.get(grpCtx).onIndexDefragmented(oldSize, newSize);
+ }
+
+ /** */
+ public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) {
+ DefragmentationCacheGroupProgress progress = progressGroups.remove(grpCtx);
+
+ progress.onFinish();
+
+ finishedGroups.put(grpCtx, progress);
+ }
+
+ /** */
+ public synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx) {
+ scheduledGroups.remove(grpCtx.cacheOrGroupName());
+
+ skippedGroups.add(grpCtx.cacheOrGroupName());
+ }
+
+ /** */
+ public synchronized void onFinish() {
+ finishTs = System.currentTimeMillis();
+
+ progressGroups.clear();
+
+ scheduledGroups.clear();
+
+ log.info("Defragmentation process completed. Time: " + (finishTs - startTs) * 1e-3 + "s.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ if (!finishedGroups.isEmpty()) {
+ sb.append("Defragmentation is completed for cache groups:\n");
+
+ for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : finishedGroups.entrySet()) {
+ sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - ");
+
+ sb.append(entry.getValue().toString()).append('\n');
+ }
+ }
+
+ if (!progressGroups.isEmpty()) {
+ sb.append("Defragmentation is in progress for cache groups:\n");
+
+ for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : progressGroups.entrySet()) {
+ sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - ");
+
+ sb.append(entry.getValue().toString()).append('\n');
+ }
+ }
+
+ if (!skippedGroups.isEmpty())
+ sb.append("Skipped cache groups: ").append(String.join(", ", skippedGroups)).append('\n');
+
+ if (!scheduledGroups.isEmpty())
+ sb.append("Awaiting defragmentation: ").append(String.join(", ", scheduledGroups)).append('\n');
+
+ return sb.toString();
+ }
+ }
+
+ /** */
+ private static class DefragmentationCacheGroupProgress {
+ /** */
+ private static final DecimalFormat MB_FORMAT = new DecimalFormat(
+ "#.##",
+ DecimalFormatSymbols.getInstance(Locale.US)
+ );
+
+ /** */
+ private final int partsTotal;
+
+ /** */
+ private int partsCompleted;
+
+ /** */
+ private long oldSize;
+
+ /** */
+ private long newSize;
+
+ /** */
+ private final long startTs;
+
+ /** */
+ private long finishTs;
+
+ /** */
+ public DefragmentationCacheGroupProgress(int parts) {
+ partsTotal = parts;
+
+ startTs = System.currentTimeMillis();
+ }
+
+ /**
+ * @param oldSize Old partition size.
+ * @param newSize New partition size.
+ */
+ public void onPartitionDefragmented(long oldSize, long newSize) {
+ ++partsCompleted;
+
+ this.oldSize += oldSize;
+ this.newSize += newSize;
+ }
+
+ /**
+ * @param oldSize Old partition size.
+ * @param newSize New partition size.
+ */
+ public void onIndexDefragmented(long oldSize, long newSize) {
+ this.oldSize += oldSize;
+ this.newSize += newSize;
+ }
+
+ /** */
+ public void onFinish() {
+ finishTs = System.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ if (finishTs == 0) {
+ sb.append("partitions processed/all: ").append(partsCompleted).append("/").append(partsTotal);
+
+ sb.append(", time elapsed: ");
+
+ appendDuration(sb, System.currentTimeMillis());
+ }
+ else {
+ double mb = 1024 * 1024;
+
+ sb.append("size before/after: ").append(MB_FORMAT.format(oldSize / mb)).append("MB/");
+ sb.append(MB_FORMAT.format(newSize / mb)).append("MB");
+
+ sb.append(", time took: ");
+
+ appendDuration(sb, finishTs);
+ }
+
+ return sb.toString();
+ }
+
+ /** */
+ private void appendDuration(StringBuilder sb, long end) {
+ long duration = Math.round((end - startTs) * 1e-3);
+
+ long mins = duration / 60;
+ long secs = duration % 60;
+
+ sb.append(mins).append(" mins ").append(secs).append(" secs");
+ }
+ }
}