You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/03/26 10:35:46 UTC

[ignite] branch master updated: IGNITE-14222 Exclude evicted partitions from re-encryption - Fixes #8846.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 47a9014  IGNITE-14222 Exclude evicted partitions from re-encryption - Fixes #8846.
47a9014 is described below

commit 47a90145ae2e2788f53775df8da5edf607b14fb3
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Fri Mar 26 13:18:17 2021 +0300

    IGNITE-14222 Exclude evicted partitions from re-encryption - Fixes #8846.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../managers/encryption/CacheGroupPageScanner.java | 234 +++++++++++++--------
 .../managers/encryption/GridEncryptionManager.java |   8 +
 .../GridCacheDatabaseSharedManager.java            |   5 +-
 .../cache/persistence/GridCacheOffheapManager.java |  15 +-
 .../persistence/checkpoint/CheckpointManager.java  |   6 +-
 .../cache/persistence/checkpoint/Checkpointer.java |  37 ++--
 .../checkpoint/LightweightCheckpointManager.java   |   6 +-
 .../checkpoint/PartitionDestroyQueue.java          |  18 +-
 .../encryption/AbstractEncryptionTest.java         |   2 +
 .../encryption/CacheGroupReencryptionTest.java     |   9 +-
 10 files changed, 211 insertions(+), 129 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java
index c2a6895..81ff3e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
@@ -150,12 +151,14 @@ public class CacheGroupPageScanner implements CheckpointListener {
 
                 try {
                     for (GroupScanTask grpScanTask : completeCandidates) {
-                        grps.remove(grpScanTask.groupId());
+                        grps.remove(grpScanTask.group().groupId());
 
                         grpScanTask.onDone();
 
-                        if (log.isInfoEnabled())
-                            log.info("Cache group reencryption is finished [grpId=" + grpScanTask.groupId() + "]");
+                        if (log.isInfoEnabled()) {
+                            log.info("Cache group reencryption is finished [grp=" +
+                                grpScanTask.group().cacheOrGroupName() + "]");
+                        }
                     }
 
                     if (!grps.isEmpty())
@@ -191,6 +194,8 @@ public class CacheGroupPageScanner implements CheckpointListener {
             return new GridFinishedFuture<>();
         }
 
+        GroupScanTask grpScanTask = new GroupScanTask(grp);
+
         lock.lock();
 
         try {
@@ -209,39 +214,36 @@ public class CacheGroupPageScanner implements CheckpointListener {
                 return prevState;
             }
 
-            Set<Integer> parts = new HashSet<>();
-            long[] pagesLeft = new long[1];
-
-            forEachPageStore(grp, new IgniteInClosureX<Integer>() {
-                @Override public void applyx(Integer partId) {
-                    long encState = ctx.encryption().getEncryptionState(grpId, partId);
+            grps.put(grpId, grpScanTask);
+        } finally {
+            lock.unlock();
+        }
 
-                    if (encState == 0) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping partition reencryption [grp=" + grpId + ", p=" + partId + "]");
+        singleExecSvc.submit(() -> schedule0(grpScanTask));
 
-                        return;
-                    }
+        return grpScanTask;
+    }
 
-                    parts.add(partId);
+    /**
+     * @param grpScanTask Cache group scan task.
+     */
+    private void schedule0(GroupScanTask grpScanTask) {
+        try {
+            forEachPageStore(grpScanTask.group(), new IgniteInClosureX<Integer>() {
+                @Override public void applyx(Integer partId) {
+                    long encState = ctx.encryption().getEncryptionState(grpScanTask.group().groupId(), partId);
 
-                    pagesLeft[0] += (ReencryptStateUtils.pageCount(encState) - ReencryptStateUtils.pageIndex(encState));
+                    if (encState != 0)
+                        grpScanTask.addPartition(partId, encState);
                 }
             });
 
-            GroupScanTask grpScan = new GroupScanTask(grp, parts, pagesLeft[0]);
-
-            singleExecSvc.submit(grpScan);
-
             if (log.isInfoEnabled())
-                log.info("Scheduled reencryption [grpId=" + grpId + "]");
+                log.info("Scheduled reencryption [grp=" + grpScanTask.group().cacheOrGroupName() + "]");
 
-            grps.put(grpId, grpScan);
-
-            return grpScan;
-        }
-        finally {
-            lock.unlock();
+            grpScanTask.checkComplete();
+        } catch (IgniteCheckedException e) {
+            grpScanTask.onDone(e);
         }
     }
 
@@ -291,6 +293,17 @@ public class CacheGroupPageScanner implements CheckpointListener {
     }
 
     /**
+     * @param grpId Cache group ID.
+     * @param partId Partition ID.
+     */
+    public void includePartition(int grpId, int partId) {
+        GroupScanTask grpScanTask = grps.get(grpId);
+
+        if (grpScanTask != null)
+            grpScanTask.schedulePending(partId);
+    }
+
+    /**
      * Collect current number of pages in the specified cache group.
      *
      * @param grp Cache group.
@@ -385,30 +398,42 @@ public class CacheGroupPageScanner implements CheckpointListener {
     /**
      * Cache group partition scanning task.
      */
-    private class GroupScanTask extends GridFutureAdapter<Void> implements Runnable {
+    private class GroupScanTask extends GridFutureAdapter<Void> {
         /** Cache group ID. */
         private final CacheGroupContext grp;
 
-        /** Partition IDs. */
-        private final Set<Integer> parts;
+        /** Partitions to reencrypt. */
+        private final Set<Integer> parts = new GridConcurrentHashSet<>();
 
         /** Page memory. */
         private final PageMemoryEx pageMem;
 
         /** Total memory pages left for reencryption. */
-        private final AtomicLong remainingPagesCntr;
+        private final AtomicLong remainingPagesCntr = new AtomicLong();
 
         /**
          * @param grp Cache group.
          */
-        public GroupScanTask(CacheGroupContext grp, Set<Integer> parts, long remainingPagesCnt) {
+        public GroupScanTask(CacheGroupContext grp) {
             this.grp = grp;
-            this.parts = new GridConcurrentHashSet<>(parts);
 
-            remainingPagesCntr = new AtomicLong(remainingPagesCnt);
             pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
         }
 
+        /**
+         * Schedule partition re-encryption.
+         *
+         * @param partId Partition ID.
+         * @param state Reencryption status.
+         */
+        public synchronized void addPartition(int partId, long state) {
+            remainingPagesCntr.addAndGet(ReencryptStateUtils.pageCount(state) - ReencryptStateUtils.pageIndex(state));
+
+            parts.add(partId);
+
+            schedulePartitionScan(partId);
+        }
+
         /** {@inheritDoc} */
         @Override public synchronized boolean cancel() throws IgniteCheckedException {
             return onCancelled();
@@ -421,18 +446,36 @@ public class CacheGroupPageScanner implements CheckpointListener {
          * @return {@code True} if reencryption was cancelled.
          */
         public synchronized boolean excludePartition(int partId) {
-            long state = ctx.encryption().getEncryptionState(groupId(), partId);
+            if (!parts.remove(partId))
+                return false;
+
+            long state = ctx.encryption().getEncryptionState(grp.groupId(), partId);
 
-            remainingPagesCntr.addAndGet(ReencryptStateUtils.pageIndex(state) - ReencryptStateUtils.pageCount(state));
+            long pagesLeft = remainingPagesCntr.addAndGet(
+                ReencryptStateUtils.pageIndex(state) - ReencryptStateUtils.pageCount(state));
 
-            return parts.remove(partId);
+            if (pagesLeft == 0)
+                checkComplete();
+
+            return true;
         }
 
         /**
-         * @return Cache group ID.
+         * @param partId Partition ID.
          */
-        public int groupId() {
-            return grp.groupId();
+        public synchronized void schedulePending(int partId) {
+            if (isDone())
+                return;
+
+            if (parts.contains(partId))
+                schedulePartitionScan(partId);
+        }
+
+        /**
+         * @return Cache group context.
+         */
+        public CacheGroupContext group() {
+            return grp;
         }
 
         /**
@@ -442,74 +485,91 @@ public class CacheGroupPageScanner implements CheckpointListener {
             return remainingPagesCntr.get();
         }
 
-        /** {@inheritDoc} */
-        @Override public void run() {
-            try {
-                for (int partId : parts) {
-                    long state = ctx.encryption().getEncryptionState(grp.groupId(), partId);
-
-                    if (state == 0)
-                        continue;
-
-                    scanPartition(partId, ReencryptStateUtils.pageIndex(state), ReencryptStateUtils.pageCount(state));
+        /**
+         * @param partId Partition ID.
+         */
+        private void schedulePartitionScan(int partId) {
+            singleExecSvc.submit(() -> scanPartition(partId));
+        }
 
-                    if (isDone())
-                        return;
-                }
+        /**
+         * Check the completeness of the cache group scan.
+         */
+        private synchronized void checkComplete() {
+            if (!isDone() && parts.isEmpty() && !cpWaitGrps.contains(this))
+                cpWaitGrps.add(this);
+        }
 
-                boolean added = cpWaitGrps.add(this);
+        /**
+         * @param partId Partition ID.
+         * @return {@code True} if partition has been evicted.
+         */
+        private boolean evicted(int partId) {
+            if (partId == PageIdAllocator.INDEX_PARTITION)
+                return false;
 
-                assert added;
-            }
-            catch (Throwable t) {
-                if (X.hasCause(t, NodeStoppingException.class))
-                    onCancelled();
-                else
-                    onDone(t);
-            }
+            return !parts.contains(partId) ||
+                grp.topology().localPartition(partId).state() == GridDhtPartitionState.EVICTED;
         }
 
         /**
          * @param partId Partition ID.
-         * @param off Start page offset.
-         * @param cnt Count of pages to scan.
          */
-        private void scanPartition(int partId, int off, int cnt) throws IgniteCheckedException {
+        private void scanPartition(int partId) {
+            long state = ctx.encryption().getEncryptionState(grp.groupId(), partId);
+
+            int off = ReencryptStateUtils.pageIndex(state);
+            int cnt = ReencryptStateUtils.pageCount(state);
+
             if (log.isDebugEnabled()) {
                 log.debug("Partition reencryption is started [grpId=" + grp.groupId() +
                     ", p=" + partId + ", remain=" + (cnt - off) + ", total=" + cnt + "]");
             }
 
-            while (off < cnt) {
-                int pagesCnt = Math.min(batchSize, cnt - off);
+            try {
+                while (off < cnt) {
+                    int pagesCnt = Math.min(batchSize, cnt - off);
 
-                limiter.acquire(pagesCnt);
+                    limiter.acquire(pagesCnt);
 
-                synchronized (this) {
-                    if (isDone() || !parts.contains(partId))
-                        break;
+                    synchronized (this) {
+                        if (isDone())
+                            return;
 
-                    ctx.cache().context().database().checkpointReadLock();
+                        ctx.cache().context().database().checkpointReadLock();
 
-                    try {
-                        off += scanPages(partId, off, pagesCnt);
-                    }
-                    finally {
-                        ctx.cache().context().database().checkpointReadUnlock();
+                        try {
+                            if (evicted(partId))
+                                return;
+
+                            off += scanPages(partId, off, pagesCnt);
+                        }
+                        finally {
+                            ctx.cache().context().database().checkpointReadUnlock();
+                        }
                     }
-                }
 
-                remainingPagesCntr.addAndGet(-pagesCnt);
+                    remainingPagesCntr.addAndGet(-pagesCnt);
 
-                ctx.encryption().setEncryptionState(grp, partId, off, cnt);
-            }
+                    ctx.encryption().setEncryptionState(grp, partId, off, cnt);
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug("Partition reencryption is finished " +
-                    "[grpId=" + grp.groupId() +
-                    ", p=" + partId +
-                    ", remain=" + (cnt - off) +
-                    ", total=" + cnt + "]");
+                parts.remove(partId);
+            } catch (Throwable t) {
+                if (X.hasCause(t, NodeStoppingException.class))
+                    onCancelled();
+                else
+                    onDone(t);
+            } finally {
+                checkComplete();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Partition reencryption is finished " +
+                        "[grpId=" + grp.groupId() +
+                        ", p=" + partId +
+                        ", remain=" + (cnt - off) +
+                        ", total=" + cnt + "]");
+                }
             }
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index 2db0172..0bb90d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -910,6 +910,14 @@ public class GridEncryptionManager extends GridManagerAdapter<EncryptionSpi> imp
     }
 
     /**
+     * @param grp Cache group.
+     * @param partId Partition ID.
+     */
+    public void onCancelDestroyPartitionStore(CacheGroupContext grp, int partId) {
+        pageScanner.includePartition(grp.groupId(), partId);
+    }
+
+    /**
      * Callback when WAL segment is removed.
      *
      * @param segmentIdx WAL segment index.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 8fa76c8..176a3e8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2906,9 +2906,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @param grpId Group ID.
      * @param partId Partition ID.
      * @throws IgniteCheckedException If failed.
+     * @return {@code True} if the request to destroy the partition was canceled.
      */
-    public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
-        checkpointManager.cancelOrWaitPartitionDestroy(grpId, partId);
+    public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+        return checkpointManager.cancelOrWaitPartitionDestroy(grpId, partId);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 2873d2d..af0c4d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -243,8 +243,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
     /** {@inheritDoc} */
     @Override protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException {
-        if (ctx.database() instanceof GridCacheDatabaseSharedManager)
-            ((GridCacheDatabaseSharedManager) ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p);
+        if (ctx.database() instanceof GridCacheDatabaseSharedManager) {
+            boolean canceled =
+                ((GridCacheDatabaseSharedManager)ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p);
+
+            if (canceled && grp.config().isEncryptionEnabled())
+                ctx.kernalContext().encryption().onCancelDestroyPartitionStore(grp, p);
+        }
 
         boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p);
 
@@ -1015,15 +1020,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
     public void destroyPartitionStore(int grpId, int partId) throws IgniteCheckedException {
         PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-        if (grp.config().isEncryptionEnabled())
-            ctx.kernalContext().encryption().onDestroyPartitionStore(grp, partId);
-
         int tag = pageMemory.invalidate(grp.groupId(), partId);
 
         if (grp.walEnabled())
             ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId));
 
         ctx.pageStore().onPartitionDestroyed(grpId, partId, tag);
+
+        if (grp.config().isEncryptionEnabled())
+            ctx.kernalContext().encryption().onDestroyPartitionStore(grp, partId);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 71abc7f..73b05d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -370,12 +370,12 @@ public class CheckpointManager {
     /**
      * @param grpId Group ID.
      * @param partId Partition ID.
+     * @return {@code True} if the request to destroy the partition was canceled.
      */
-    public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+    public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
         Checkpointer cp = checkpointer;
 
-        if (cp != null)
-            checkpointer.cancelOrWaitPartitionDestroy(grpId, partId);
+        return cp != null && cp.cancelOrWaitPartitionDestroy(grpId, partId);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index d18a0ca..1ad3160 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -739,31 +739,42 @@ public class Checkpointer extends GridWorker {
     /**
      * @param grpId Group ID.
      * @param partId Partition ID.
+     * @return {@code True} if the request to destroy the partition was canceled.
      */
-    public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+    public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
         PartitionDestroyRequest req;
+        boolean canceled = false;
 
         synchronized (this) {
-            req = scheduledCp.getDestroyQueue().cancelDestroy(grpId, partId);
-        }
+            req = scheduledCp.getDestroyQueue().removeRequest(grpId, partId);
 
-        if (req != null)
-            req.waitCompleted();
+            if (req != null) {
+                canceled = req.cancel();
 
-        CheckpointProgressImpl cur;
+                assert canceled;
+            }
 
-        synchronized (this) {
-            cur = curCpProgress;
+            CheckpointProgressImpl cur = curCpProgress;
 
-            if (cur != null)
-                req = cur.getDestroyQueue().cancelDestroy(grpId, partId);
+            if (cur != null) {
+                req = cur.getDestroyQueue().removeRequest(grpId, partId);
+
+                if (req != null)
+                    canceled = req.cancel();
+            }
         }
 
-        if (req != null)
-            req.waitCompleted();
+        if (!canceled) {
+            if (req != null)
+                req.waitCompleted();
+
+            return false;
+        }
 
-        if (req != null && log.isDebugEnabled())
+        if (log.isDebugEnabled())
             log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]");
+
+        return true;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
index 653f53f..5e1a494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
@@ -282,12 +282,12 @@ public class LightweightCheckpointManager {
     /**
      * @param grpId Group ID.
      * @param partId Partition ID.
+     * @return {@code True} if the request to destroy the partition was canceled.
      */
-    public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+    public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
         Checkpointer cp = checkpointer;
 
-        if (cp != null)
-            checkpointer.cancelOrWaitPartitionDestroy(grpId, partId);
+        return cp != null && cp.cancelOrWaitPartitionDestroy(grpId, partId);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java
index 48a1eff..8a4c549 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java
@@ -47,24 +47,12 @@ public class PartitionDestroyQueue {
     }
 
     /**
-     * @param destroyId Destroy ID.
-     * @return Destroy request to complete if was not concurrently cancelled.
-     */
-    private PartitionDestroyRequest beginDestroy(T2<Integer, Integer> destroyId) {
-        PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId);
-
-        return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null;
-    }
-
-    /**
      * @param grpId Group ID.
      * @param partId Partition ID.
-     * @return Destroy request to wait for if destroy has begun.
+     * @return Destroy request that was removed from the queue or {@code null} if the request was not found.
      */
-    public PartitionDestroyRequest cancelDestroy(int grpId, int partId) {
-        PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(grpId, partId));
-
-        return rmvd == null ? null : !rmvd.cancel() ? rmvd : null;
+    public PartitionDestroyRequest removeRequest(int grpId, int partId) {
+        return pendingReqs.remove(new T2<>(grpId, partId));
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java
index 2911f6e..ddb6718 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java
@@ -410,6 +410,8 @@ public abstract class AbstractEncryptionTest extends GridCommonAbstractTest {
 
             assertTrue(fut.isDone());
 
+            assertEquals(0, encryption.getBytesLeftForReencryption(grpId));
+
             List<Integer> parts = IntStream.range(0, grp.shared().affinity().affinity(grpId).partitions())
                 .boxed().collect(Collectors.toList());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java
index a47a0ee..8bd96e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -88,6 +89,9 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
     /** The number of pages that is scanned during re-encryption under checkpoint lock. */
     private int pageScanBatchSize = EncryptionConfiguration.DFLT_REENCRYPTION_BATCH_SIZE;
 
+    /** Checkpoint frequency (seconds). */
+    private long checkpointFreq = 30;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(name);
@@ -109,7 +113,7 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
             .setWalSegmentSize(10 * 1024 * 1024)
             .setWalSegments(4)
             .setMaxWalArchiveSize(100 * 1024 * 1024L)
-            .setCheckpointFrequency(30 * 1000L)
+            .setCheckpointFrequency(TimeUnit.SECONDS.toMillis(checkpointFreq))
             .setWalMode(LOG_ONLY)
             .setFileIOFactory(new FailingFileIOFactory(new RandomAccessFileIOFactory(), failFileIO))
             .setEncryptionConfiguration(encCfg);
@@ -634,6 +638,7 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
     public void testChangeBaseline() throws Exception {
         backups = 1;
         pageScanRate = 2;
+        checkpointFreq = 10;
 
         T2<IgniteEx, IgniteEx> nodes = startTestGrids(true);
 
@@ -741,6 +746,8 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
         validateMetrics(node0, false);
         validateMetrics(node1, false);
 
+        forceCheckpoint();
+
         pageScanRate = DFLT_REENCRYPTION_RATE_MBPS;
 
         stopAllGrids();