You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/03/26 10:35:46 UTC
[ignite] branch master updated: IGNITE-14222 Exclude evicted
partitions from re-encryption - Fixes #8846.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 47a9014 IGNITE-14222 Exclude evicted partitions from re-encryption - Fixes #8846.
47a9014 is described below
commit 47a90145ae2e2788f53775df8da5edf607b14fb3
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Fri Mar 26 13:18:17 2021 +0300
IGNITE-14222 Exclude evicted partitions from re-encryption - Fixes #8846.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../managers/encryption/CacheGroupPageScanner.java | 234 +++++++++++++--------
.../managers/encryption/GridEncryptionManager.java | 8 +
.../GridCacheDatabaseSharedManager.java | 5 +-
.../cache/persistence/GridCacheOffheapManager.java | 15 +-
.../persistence/checkpoint/CheckpointManager.java | 6 +-
.../cache/persistence/checkpoint/Checkpointer.java | 37 ++--
.../checkpoint/LightweightCheckpointManager.java | 6 +-
.../checkpoint/PartitionDestroyQueue.java | 18 +-
.../encryption/AbstractEncryptionTest.java | 2 +
.../encryption/CacheGroupReencryptionTest.java | 9 +-
10 files changed, 211 insertions(+), 129 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java
index c2a6895..81ff3e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
@@ -150,12 +151,14 @@ public class CacheGroupPageScanner implements CheckpointListener {
try {
for (GroupScanTask grpScanTask : completeCandidates) {
- grps.remove(grpScanTask.groupId());
+ grps.remove(grpScanTask.group().groupId());
grpScanTask.onDone();
- if (log.isInfoEnabled())
- log.info("Cache group reencryption is finished [grpId=" + grpScanTask.groupId() + "]");
+ if (log.isInfoEnabled()) {
+ log.info("Cache group reencryption is finished [grp=" +
+ grpScanTask.group().cacheOrGroupName() + "]");
+ }
}
if (!grps.isEmpty())
@@ -191,6 +194,8 @@ public class CacheGroupPageScanner implements CheckpointListener {
return new GridFinishedFuture<>();
}
+ GroupScanTask grpScanTask = new GroupScanTask(grp);
+
lock.lock();
try {
@@ -209,39 +214,36 @@ public class CacheGroupPageScanner implements CheckpointListener {
return prevState;
}
- Set<Integer> parts = new HashSet<>();
- long[] pagesLeft = new long[1];
-
- forEachPageStore(grp, new IgniteInClosureX<Integer>() {
- @Override public void applyx(Integer partId) {
- long encState = ctx.encryption().getEncryptionState(grpId, partId);
+ grps.put(grpId, grpScanTask);
+ } finally {
+ lock.unlock();
+ }
- if (encState == 0) {
- if (log.isDebugEnabled())
- log.debug("Skipping partition reencryption [grp=" + grpId + ", p=" + partId + "]");
+ singleExecSvc.submit(() -> schedule0(grpScanTask));
- return;
- }
+ return grpScanTask;
+ }
- parts.add(partId);
+ /**
+ * @param grpScanTask Cache group scan task.
+ */
+ private void schedule0(GroupScanTask grpScanTask) {
+ try {
+ forEachPageStore(grpScanTask.group(), new IgniteInClosureX<Integer>() {
+ @Override public void applyx(Integer partId) {
+ long encState = ctx.encryption().getEncryptionState(grpScanTask.group().groupId(), partId);
- pagesLeft[0] += (ReencryptStateUtils.pageCount(encState) - ReencryptStateUtils.pageIndex(encState));
+ if (encState != 0)
+ grpScanTask.addPartition(partId, encState);
}
});
- GroupScanTask grpScan = new GroupScanTask(grp, parts, pagesLeft[0]);
-
- singleExecSvc.submit(grpScan);
-
if (log.isInfoEnabled())
- log.info("Scheduled reencryption [grpId=" + grpId + "]");
+ log.info("Scheduled reencryption [grp=" + grpScanTask.group().cacheOrGroupName() + "]");
- grps.put(grpId, grpScan);
-
- return grpScan;
- }
- finally {
- lock.unlock();
+ grpScanTask.checkComplete();
+ } catch (IgniteCheckedException e) {
+ grpScanTask.onDone(e);
}
}
@@ -291,6 +293,17 @@ public class CacheGroupPageScanner implements CheckpointListener {
}
/**
+ * @param grpId Cache group ID.
+ * @param partId Partition ID.
+ */
+ public void includePartition(int grpId, int partId) {
+ GroupScanTask grpScanTask = grps.get(grpId);
+
+ if (grpScanTask != null)
+ grpScanTask.schedulePending(partId);
+ }
+
+ /**
* Collect current number of pages in the specified cache group.
*
* @param grp Cache group.
@@ -385,30 +398,42 @@ public class CacheGroupPageScanner implements CheckpointListener {
/**
* Cache group partition scanning task.
*/
- private class GroupScanTask extends GridFutureAdapter<Void> implements Runnable {
+ private class GroupScanTask extends GridFutureAdapter<Void> {
/** Cache group ID. */
private final CacheGroupContext grp;
- /** Partition IDs. */
- private final Set<Integer> parts;
+ /** Partitions to reencrypt. */
+ private final Set<Integer> parts = new GridConcurrentHashSet<>();
/** Page memory. */
private final PageMemoryEx pageMem;
/** Total memory pages left for reencryption. */
- private final AtomicLong remainingPagesCntr;
+ private final AtomicLong remainingPagesCntr = new AtomicLong();
/**
* @param grp Cache group.
*/
- public GroupScanTask(CacheGroupContext grp, Set<Integer> parts, long remainingPagesCnt) {
+ public GroupScanTask(CacheGroupContext grp) {
this.grp = grp;
- this.parts = new GridConcurrentHashSet<>(parts);
- remainingPagesCntr = new AtomicLong(remainingPagesCnt);
pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
}
+ /**
+ * Schedule partition re-encryption.
+ *
+ * @param partId Partition ID.
+ * @param state Reencryption status.
+ */
+ public synchronized void addPartition(int partId, long state) {
+ remainingPagesCntr.addAndGet(ReencryptStateUtils.pageCount(state) - ReencryptStateUtils.pageIndex(state));
+
+ parts.add(partId);
+
+ schedulePartitionScan(partId);
+ }
+
/** {@inheritDoc} */
@Override public synchronized boolean cancel() throws IgniteCheckedException {
return onCancelled();
@@ -421,18 +446,36 @@ public class CacheGroupPageScanner implements CheckpointListener {
* @return {@code True} if reencryption was cancelled.
*/
public synchronized boolean excludePartition(int partId) {
- long state = ctx.encryption().getEncryptionState(groupId(), partId);
+ if (!parts.remove(partId))
+ return false;
+
+ long state = ctx.encryption().getEncryptionState(grp.groupId(), partId);
- remainingPagesCntr.addAndGet(ReencryptStateUtils.pageIndex(state) - ReencryptStateUtils.pageCount(state));
+ long pagesLeft = remainingPagesCntr.addAndGet(
+ ReencryptStateUtils.pageIndex(state) - ReencryptStateUtils.pageCount(state));
- return parts.remove(partId);
+ if (pagesLeft == 0)
+ checkComplete();
+
+ return true;
}
/**
- * @return Cache group ID.
+ * @param partId Partition ID.
*/
- public int groupId() {
- return grp.groupId();
+ public synchronized void schedulePending(int partId) {
+ if (isDone())
+ return;
+
+ if (parts.contains(partId))
+ schedulePartitionScan(partId);
+ }
+
+ /**
+ * @return Cache group context.
+ */
+ public CacheGroupContext group() {
+ return grp;
}
/**
@@ -442,74 +485,91 @@ public class CacheGroupPageScanner implements CheckpointListener {
return remainingPagesCntr.get();
}
- /** {@inheritDoc} */
- @Override public void run() {
- try {
- for (int partId : parts) {
- long state = ctx.encryption().getEncryptionState(grp.groupId(), partId);
-
- if (state == 0)
- continue;
-
- scanPartition(partId, ReencryptStateUtils.pageIndex(state), ReencryptStateUtils.pageCount(state));
+ /**
+ * @param partId Partition ID.
+ */
+ private void schedulePartitionScan(int partId) {
+ singleExecSvc.submit(() -> scanPartition(partId));
+ }
- if (isDone())
- return;
- }
+ /**
+ * Check the completeness of the cache group scan.
+ */
+ private synchronized void checkComplete() {
+ if (!isDone() && parts.isEmpty() && !cpWaitGrps.contains(this))
+ cpWaitGrps.add(this);
+ }
- boolean added = cpWaitGrps.add(this);
+ /**
+ * @param partId Partition ID.
+ * @return {@code True} if partition has been evicted.
+ */
+ private boolean evicted(int partId) {
+ if (partId == PageIdAllocator.INDEX_PARTITION)
+ return false;
- assert added;
- }
- catch (Throwable t) {
- if (X.hasCause(t, NodeStoppingException.class))
- onCancelled();
- else
- onDone(t);
- }
+ return !parts.contains(partId) ||
+ grp.topology().localPartition(partId).state() == GridDhtPartitionState.EVICTED;
}
/**
* @param partId Partition ID.
- * @param off Start page offset.
- * @param cnt Count of pages to scan.
*/
- private void scanPartition(int partId, int off, int cnt) throws IgniteCheckedException {
+ private void scanPartition(int partId) {
+ long state = ctx.encryption().getEncryptionState(grp.groupId(), partId);
+
+ int off = ReencryptStateUtils.pageIndex(state);
+ int cnt = ReencryptStateUtils.pageCount(state);
+
if (log.isDebugEnabled()) {
log.debug("Partition reencryption is started [grpId=" + grp.groupId() +
", p=" + partId + ", remain=" + (cnt - off) + ", total=" + cnt + "]");
}
- while (off < cnt) {
- int pagesCnt = Math.min(batchSize, cnt - off);
+ try {
+ while (off < cnt) {
+ int pagesCnt = Math.min(batchSize, cnt - off);
- limiter.acquire(pagesCnt);
+ limiter.acquire(pagesCnt);
- synchronized (this) {
- if (isDone() || !parts.contains(partId))
- break;
+ synchronized (this) {
+ if (isDone())
+ return;
- ctx.cache().context().database().checkpointReadLock();
+ ctx.cache().context().database().checkpointReadLock();
- try {
- off += scanPages(partId, off, pagesCnt);
- }
- finally {
- ctx.cache().context().database().checkpointReadUnlock();
+ try {
+ if (evicted(partId))
+ return;
+
+ off += scanPages(partId, off, pagesCnt);
+ }
+ finally {
+ ctx.cache().context().database().checkpointReadUnlock();
+ }
}
- }
- remainingPagesCntr.addAndGet(-pagesCnt);
+ remainingPagesCntr.addAndGet(-pagesCnt);
- ctx.encryption().setEncryptionState(grp, partId, off, cnt);
- }
+ ctx.encryption().setEncryptionState(grp, partId, off, cnt);
+ }
- if (log.isDebugEnabled()) {
- log.debug("Partition reencryption is finished " +
- "[grpId=" + grp.groupId() +
- ", p=" + partId +
- ", remain=" + (cnt - off) +
- ", total=" + cnt + "]");
+ parts.remove(partId);
+ } catch (Throwable t) {
+ if (X.hasCause(t, NodeStoppingException.class))
+ onCancelled();
+ else
+ onDone(t);
+ } finally {
+ checkComplete();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Partition reencryption is finished " +
+ "[grpId=" + grp.groupId() +
+ ", p=" + partId +
+ ", remain=" + (cnt - off) +
+ ", total=" + cnt + "]");
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index 2db0172..0bb90d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -910,6 +910,14 @@ public class GridEncryptionManager extends GridManagerAdapter<EncryptionSpi> imp
}
/**
+ * @param grp Cache group.
+ * @param partId Partition ID.
+ */
+ public void onCancelDestroyPartitionStore(CacheGroupContext grp, int partId) {
+ pageScanner.includePartition(grp.groupId(), partId);
+ }
+
+ /**
* Callback when WAL segment is removed.
*
* @param segmentIdx WAL segment index.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 8fa76c8..176a3e8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2906,9 +2906,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param grpId Group ID.
* @param partId Partition ID.
* @throws IgniteCheckedException If failed.
+ * @return {@code True} if the request to destroy the partition was canceled.
*/
- public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
- checkpointManager.cancelOrWaitPartitionDestroy(grpId, partId);
+ public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+ return checkpointManager.cancelOrWaitPartitionDestroy(grpId, partId);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 2873d2d..af0c4d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -243,8 +243,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException {
- if (ctx.database() instanceof GridCacheDatabaseSharedManager)
- ((GridCacheDatabaseSharedManager) ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p);
+ if (ctx.database() instanceof GridCacheDatabaseSharedManager) {
+ boolean canceled =
+ ((GridCacheDatabaseSharedManager)ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p);
+
+ if (canceled && grp.config().isEncryptionEnabled())
+ ctx.kernalContext().encryption().onCancelDestroyPartitionStore(grp, p);
+ }
boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p);
@@ -1015,15 +1020,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
public void destroyPartitionStore(int grpId, int partId) throws IgniteCheckedException {
PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
- if (grp.config().isEncryptionEnabled())
- ctx.kernalContext().encryption().onDestroyPartitionStore(grp, partId);
-
int tag = pageMemory.invalidate(grp.groupId(), partId);
if (grp.walEnabled())
ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId));
ctx.pageStore().onPartitionDestroyed(grpId, partId, tag);
+
+ if (grp.config().isEncryptionEnabled())
+ ctx.kernalContext().encryption().onDestroyPartitionStore(grp, partId);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 71abc7f..73b05d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -370,12 +370,12 @@ public class CheckpointManager {
/**
* @param grpId Group ID.
* @param partId Partition ID.
+ * @return {@code True} if the request to destroy the partition was canceled.
*/
- public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+ public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
Checkpointer cp = checkpointer;
- if (cp != null)
- checkpointer.cancelOrWaitPartitionDestroy(grpId, partId);
+ return cp != null && cp.cancelOrWaitPartitionDestroy(grpId, partId);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index d18a0ca..1ad3160 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -739,31 +739,42 @@ public class Checkpointer extends GridWorker {
/**
* @param grpId Group ID.
* @param partId Partition ID.
+ * @return {@code True} if the request to destroy the partition was canceled.
*/
- public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+ public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
PartitionDestroyRequest req;
+ boolean canceled = false;
synchronized (this) {
- req = scheduledCp.getDestroyQueue().cancelDestroy(grpId, partId);
- }
+ req = scheduledCp.getDestroyQueue().removeRequest(grpId, partId);
- if (req != null)
- req.waitCompleted();
+ if (req != null) {
+ canceled = req.cancel();
- CheckpointProgressImpl cur;
+ assert canceled;
+ }
- synchronized (this) {
- cur = curCpProgress;
+ CheckpointProgressImpl cur = curCpProgress;
- if (cur != null)
- req = cur.getDestroyQueue().cancelDestroy(grpId, partId);
+ if (cur != null) {
+ req = cur.getDestroyQueue().removeRequest(grpId, partId);
+
+ if (req != null)
+ canceled = req.cancel();
+ }
}
- if (req != null)
- req.waitCompleted();
+ if (!canceled) {
+ if (req != null)
+ req.waitCompleted();
+
+ return false;
+ }
- if (req != null && log.isDebugEnabled())
+ if (log.isDebugEnabled())
log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]");
+
+ return true;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
index 653f53f..5e1a494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
@@ -282,12 +282,12 @@ public class LightweightCheckpointManager {
/**
* @param grpId Group ID.
* @param partId Partition ID.
+ * @return {@code True} if the request to destroy the partition was canceled.
*/
- public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
+ public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
Checkpointer cp = checkpointer;
- if (cp != null)
- checkpointer.cancelOrWaitPartitionDestroy(grpId, partId);
+ return cp != null && cp.cancelOrWaitPartitionDestroy(grpId, partId);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java
index 48a1eff..8a4c549 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/PartitionDestroyQueue.java
@@ -47,24 +47,12 @@ public class PartitionDestroyQueue {
}
/**
- * @param destroyId Destroy ID.
- * @return Destroy request to complete if was not concurrently cancelled.
- */
- private PartitionDestroyRequest beginDestroy(T2<Integer, Integer> destroyId) {
- PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId);
-
- return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null;
- }
-
- /**
* @param grpId Group ID.
* @param partId Partition ID.
- * @return Destroy request to wait for if destroy has begun.
+ * @return Destroy request that was removed from the queue or {@code null} if the request was not found.
*/
- public PartitionDestroyRequest cancelDestroy(int grpId, int partId) {
- PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(grpId, partId));
-
- return rmvd == null ? null : !rmvd.cancel() ? rmvd : null;
+ public PartitionDestroyRequest removeRequest(int grpId, int partId) {
+ return pendingReqs.remove(new T2<>(grpId, partId));
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java
index 2911f6e..ddb6718 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java
@@ -410,6 +410,8 @@ public abstract class AbstractEncryptionTest extends GridCommonAbstractTest {
assertTrue(fut.isDone());
+ assertEquals(0, encryption.getBytesLeftForReencryption(grpId));
+
List<Integer> parts = IntStream.range(0, grp.shared().affinity().affinity(grpId).partitions())
.boxed().collect(Collectors.toList());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java
index a47a0ee..8bd96e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupReencryptionTest.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -88,6 +89,9 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
/** The number of pages that is scanned during re-encryption under checkpoint lock. */
private int pageScanBatchSize = EncryptionConfiguration.DFLT_REENCRYPTION_BATCH_SIZE;
+ /** Checkpoint frequency (seconds). */
+ private long checkpointFreq = 30;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(name);
@@ -109,7 +113,7 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
.setWalSegmentSize(10 * 1024 * 1024)
.setWalSegments(4)
.setMaxWalArchiveSize(100 * 1024 * 1024L)
- .setCheckpointFrequency(30 * 1000L)
+ .setCheckpointFrequency(TimeUnit.SECONDS.toMillis(checkpointFreq))
.setWalMode(LOG_ONLY)
.setFileIOFactory(new FailingFileIOFactory(new RandomAccessFileIOFactory(), failFileIO))
.setEncryptionConfiguration(encCfg);
@@ -634,6 +638,7 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
public void testChangeBaseline() throws Exception {
backups = 1;
pageScanRate = 2;
+ checkpointFreq = 10;
T2<IgniteEx, IgniteEx> nodes = startTestGrids(true);
@@ -741,6 +746,8 @@ public class CacheGroupReencryptionTest extends AbstractEncryptionTest {
validateMetrics(node0, false);
validateMetrics(node1, false);
+ forceCheckpoint();
+
pageScanRate = DFLT_REENCRYPTION_RATE_MBPS;
stopAllGrids();