You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/04 08:45:35 UTC

[ignite] branch master updated: IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b58545534a IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924)
4b58545534a is described below

commit 4b58545534a49a39b8fd60bb560d1bec4ae235fc
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Sat Jun 4 12:45:24 2022 +0400

    IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924)
---
 .../checkpoint/CheckpointProgressImpl.java         |   6 +-
 .../pagemem/PagesWriteSpeedBasedThrottle.java      |   8 +-
 ...edBasedMemoryConsumptionThrottlingStrategy.java | 170 ++++++++++-----------
 ...va => AbstractSlowCheckpointFileIOFactory.java} |  17 ++-
 .../db/CheckpointBufferDeadlockTest.java           |   2 +-
 .../db/SlowCheckpointMetadataFileIOFactory.java    |  41 +++++
 .../db/SlowCheckpointPagesFileIOFactory.java       |  41 +++++
 .../pagemem/IgniteThrottlingUnitTest.java          |  52 ++++---
 .../pagemem/PagesWriteThrottleSandboxTest.java     |  69 +++++++--
 .../pagemem/PagesWriteThrottleSmokeTest.java       |   4 +-
 .../pagemem/SpeedBasedThrottleIntegrationTest.java | 114 ++++++++++++++
 .../performancestatistics/CheckpointTest.java      |   4 +-
 .../ignite/testsuites/IgnitePdsTestSuite5.java     |   2 +
 13 files changed, 390 insertions(+), 140 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
index 5c78bccc94e..1f0e7cebd47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
@@ -246,11 +246,11 @@ public class CheckpointProgressImpl implements CheckpointProgress {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateEvictedPages(int deltha) {
-        A.ensure(deltha > 0, "param must be positive");
+    @Override public void updateEvictedPages(int delta) {
+        A.ensure(delta > 0, "param must be positive");
 
         if (evictedPagesCounter() != null)
-            evictedPagesCounter().addAndGet(deltha);
+            evictedPagesCounter().addAndGet(delta);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
index 276b448e454..f9dae4a5854 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteOutClosure;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Throttles threads that generate dirty pages during ongoing checkpoint.
@@ -104,7 +105,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
         this.log = log;
 
         cleanPagesProtector = new SpeedBasedMemoryConsumptionThrottlingStrategy(pageMemory, cpProgress,
-                markSpeedAndAvgParkTime);
+            markSpeedAndAvgParkTime);
         cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
     }
 
@@ -186,7 +187,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
 
         if (prevWarnTime.compareAndSet(prevWarningNs, curNs) && log.isInfoEnabled()) {
             String msg = String.format("Throttling is applied to page modifications " +
-                    "[percentOfPartTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " +
+                    "[fractionOfParkTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " +
                     "estIdealMarkDirty=%d pages/sec, curDirty=%.2f, maxDirty=%.2f, avgParkTime=%d ns, " +
                     "pages: (total=%d, evicted=%d, written=%d, synced=%d, cpBufUsed=%d, cpBufTotal=%d)]",
                 weight, getMarkDirtySpeed(), getCpWriteSpeed(),
@@ -210,6 +211,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
      * @param curCpWriteSpeed average checkpoint write speed, pages/sec.
      * @return time in nanoseconds to part or 0 if throttling is not required.
      */
+    @TestOnly
     long getCleanPagesProtectionParkTime(
             double dirtyPagesRatio,
             long fullyCompletedPages,
@@ -293,7 +295,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
         if (speed <= 0)
             return 0;
 
-        long timeForOnePage = cleanPagesProtector.calcDelayTime(speed);
+        long timeForOnePage = cleanPagesProtector.nsPerOperation(speed);
 
         if (timeForOnePage == 0)
             return 0;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java
index 2a5aba62511..9fe7a90eb90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java
@@ -57,7 +57,7 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
     /**
      * Total pages possible to store in page memory.
      */
-    private final long totalPages;
+    private volatile long pageMemTotalPages;
 
     /**
      * Last estimated speed for marking all clear pages as dirty till the end of checkpoint.
@@ -111,8 +111,6 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
         this.pageMemory = pageMemory;
         this.cpProgress = cpProgress;
         this.markSpeedAndAvgParkTime = markSpeedAndAvgParkTime;
-
-        totalPages = pageMemory.totalPages();
     }
 
     /**
@@ -149,7 +147,7 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
         final int cpWrittenPages = writtenPagesCounter.get();
         final long donePages = cpDonePagesEstimation(cpWrittenPages);
 
-        final long markDirtySpeed = markSpeedAndAvgParkTime.getSpeedOpsPerSec(curNanoTime);
+        final long instantaneousMarkDirtySpeed = markSpeedAndAvgParkTime.getSpeedOpsPerSec(curNanoTime);
         // NB: we update progress for speed calculation only in this (clean pages protection) scenario, because
         // we only use the computed speed in this same scenario and for reporting in logs (where it's not super
         // important to display an ideally accurate speed), but not in the CP Buffer protection scenario.
@@ -157,7 +155,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
         // The progress is set to 0 at the beginning of a checkpoint, so we can be sure that the start time remembered
         // in cpWriteSpeed is pretty accurate even without writing to cpWriteSpeed from this method.
         cpWriteSpeed.setProgress(donePages, curNanoTime);
-        final long curCpWriteSpeed = cpWriteSpeed.getOpsPerSecond(curNanoTime);
+        // TODO: IGNITE-16878 use exponential moving average so that we react to changes faster?
+        final long avgCpWriteSpeed = cpWriteSpeed.getOpsPerSecond(curNanoTime);
 
         final int cpTotalPages = cpTotalPages();
 
@@ -166,11 +165,11 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
             // CheckpointProgressImpl.clearCounters() is invoked at the end of a checkpoint (by falling through
             // between two volatile assignments). When we get here, we don't have any information about the total
             // number of pages in the current CP, so we calculate park time by only using information we have.
-            return parkTimeToThrottleByJustCPSpeed(markDirtySpeed, curCpWriteSpeed);
+            return parkTimeToThrottleByJustCPSpeed(instantaneousMarkDirtySpeed, avgCpWriteSpeed);
         }
         else {
-            return speedBasedParkTime(cpWrittenPages, donePages, markDirtySpeed,
-                    curCpWriteSpeed, cpTotalPages);
+            return speedBasedParkTime(cpWrittenPages, donePages, cpTotalPages, instantaneousMarkDirtySpeed,
+                    avgCpWriteSpeed);
         }
     }
 
@@ -183,6 +182,9 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
      * @return estimation of work done (in pages)
      */
     private int cpDonePagesEstimation(int cpWrittenPages) {
+        // TODO: IGNITE-16879 - this only works correctly if time-to-write a page is close to time-to-sync a page.
+        // In reality, this does not seem to hold, which produces wrong estimations. We could measure the real times
+        // in Checkpointer and make this estimation a lot more precise.
         return (cpWrittenPages + cpSyncedPages()) / 2;
     }
 
@@ -198,15 +200,15 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
         boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > curCpWriteSpeed;
 
         if (throttleByCpSpeed) {
-            return calcDelayTime(curCpWriteSpeed);
+            return nsPerOperation(curCpWriteSpeed);
         }
 
         return 0;
     }
 
     /***/
-    private long speedBasedParkTime(int cpWrittenPages, long donePages, long markDirtySpeed,
-                                    long curCpWriteSpeed, int cpTotalPages) {
+    private long speedBasedParkTime(int cpWrittenPages, long donePages, int cpTotalPages,
+                                    long instantaneousMarkDirtySpeed, long avgCpWriteSpeed) {
         final double dirtyPagesRatio = pageMemory.getDirtyPagesRatio();
 
         currDirtyRatio = dirtyPagesRatio;
@@ -220,8 +222,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
                     donePages,
                     notEvictedPagesTotal(cpTotalPages),
                     threadIdsCount(),
-                    markDirtySpeed,
-                    curCpWriteSpeed);
+                    instantaneousMarkDirtySpeed,
+                    avgCpWriteSpeed);
         }
     }
 
@@ -237,8 +239,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
      * @param donePages           roughly, written & fsynced pages count.
      * @param cpTotalPages        total checkpoint scope.
      * @param nThreads            number of threads providing data during current checkpoint.
-     * @param markDirtySpeed      registered mark dirty speed, pages/sec.
-     * @param curCpWriteSpeed     average checkpoint write speed, pages/sec.
+     * @param instantaneousMarkDirtySpeed registered (during approx last second) mark dirty speed, pages/sec.
+     * @param avgCpWriteSpeed     average checkpoint write speed, pages/sec.
      * @return time in nanoseconds to part or 0 if throttling is not required.
      */
     long getParkTime(
@@ -246,84 +248,40 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
             long donePages,
             int cpTotalPages,
             int nThreads,
-            long markDirtySpeed,
-            long curCpWriteSpeed) {
+            long instantaneousMarkDirtySpeed,
+            long avgCpWriteSpeed) {
 
         final long targetSpeedToMarkAll = calcSpeedToMarkAllSpaceTillEndOfCp(dirtyPagesRatio, donePages,
-                curCpWriteSpeed, cpTotalPages);
+            avgCpWriteSpeed, cpTotalPages);
         final double targetCurrentDirtyRatio = targetCurrentDirtyRatio(donePages, cpTotalPages);
 
-        updateSpeedAndRatio(targetSpeedToMarkAll, targetCurrentDirtyRatio);
-
-        long delayByCpWrite = delayIfMarkingFasterThanCPWriteSpeedAllows(markDirtySpeed, curCpWriteSpeed,
-                dirtyPagesRatio, nThreads, targetSpeedToMarkAll, targetCurrentDirtyRatio);
-        long delayByMarkAllWrite = delayIfMarkingFasterThanTargetSpeedAllows(markDirtySpeed, dirtyPagesRatio, nThreads,
-                targetSpeedToMarkAll, targetCurrentDirtyRatio);
-
-        return Math.max(delayByCpWrite, delayByMarkAllWrite);
-    }
-
-    /***/
-    private long delayIfMarkingFasterThanCPWriteSpeedAllows(long markDirtySpeed, long curCpWriteSpeed,
-                                                            double dirtyPagesRatio, int nThreads,
-                                                            long targetSpeedToMarkAll, double targetCurrentDirtyRatio) {
-        final double allowedCpWriteSpeedExcessMultiplier = allowedCpWriteSpeedExcessMultiplier(markDirtySpeed,
-                dirtyPagesRatio, targetSpeedToMarkAll, targetCurrentDirtyRatio);
-        final boolean throttleByCpSpeed = curCpWriteSpeed > 0
-                && markDirtySpeed > (allowedCpWriteSpeedExcessMultiplier * curCpWriteSpeed);
-
-        if (!throttleByCpSpeed) {
-            return 0;
-        }
-
-        int slowdown = slowdownIfLowSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
-        long nanosecsToMarkOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / markDirtySpeed;
-        long nanosecsToWriteOneCPPage = calcDelayTime(curCpWriteSpeed, nThreads, slowdown);
-        return nanosecsToWriteOneCPPage - nanosecsToMarkOnePage;
-    }
-
-    /***/
-    private double allowedCpWriteSpeedExcessMultiplier(long markDirtySpeed, double dirtyPagesRatio,
-                                                       long targetSpeedToMarkAll, double targetCurrentDirtyRatio) {
-        final boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
-
-        // for case of speedForMarkAll >> markDirtySpeed, allow write little bit faster than CP average
-        final double allowWriteFasterThanCp;
-        if (markDirtySpeed > 0 && markDirtySpeed < targetSpeedToMarkAll)
-            allowWriteFasterThanCp = 0.1 * targetSpeedToMarkAll / markDirtySpeed;
-        else if (dirtyPagesRatio > targetCurrentDirtyRatio)
-            allowWriteFasterThanCp = 0.0;
-        else
-            allowWriteFasterThanCp = 0.1;
-
-        return lowSpaceLeft
-                ? 1.0
-                : 1.0 + allowWriteFasterThanCp;
-    }
-
-    /***/
-    private int slowdownIfLowSpaceLeft(double dirtyPagesRatio, double targetCurrentDirtyRatio) {
-        boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
-        return slowdownIfLowSpaceLeft(lowSpaceLeft);
-    }
+        publishSpeedAndRatioForMetrics(targetSpeedToMarkAll, targetCurrentDirtyRatio);
 
-    /***/
-    private int slowdownIfLowSpaceLeft(boolean lowSpaceLeft) {
-        return lowSpaceLeft ? 3 : 1;
+        return delayIfMarkingFasterThanTargetSpeedAllows(instantaneousMarkDirtySpeed,
+            dirtyPagesRatio, nThreads, targetSpeedToMarkAll, targetCurrentDirtyRatio);
     }
 
     /***/
-    private long delayIfMarkingFasterThanTargetSpeedAllows(long markDirtySpeed, double dirtyPagesRatio, int nThreads,
+    private long delayIfMarkingFasterThanTargetSpeedAllows(long instantaneousMarkDirtySpeed, double dirtyPagesRatio,
+                                                           int nThreads,
                                                            long targetSpeedToMarkAll, double targetCurrentDirtyRatio) {
         final boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
         final int slowdown = slowdownIfLowSpaceLeft(lowSpaceLeft);
 
         double multiplierForSpeedToMarkAll = lowSpaceLeft ? 0.8 : 1.0;
         boolean markingTooFastNow = targetSpeedToMarkAll > 0
-                && markDirtySpeed > multiplierForSpeedToMarkAll * targetSpeedToMarkAll;
+                && instantaneousMarkDirtySpeed > multiplierForSpeedToMarkAll * targetSpeedToMarkAll;
         boolean markedTooFastSinceCPStart = dirtyPagesRatio > targetCurrentDirtyRatio;
         boolean markingTooFast = markedTooFastSinceCPStart && markingTooFastNow;
-        return markingTooFast ? calcDelayTime(targetSpeedToMarkAll, nThreads, slowdown) : 0;
+
+        // We must NOT subtract nsPerOperation(instantaneousMarkDirtySpeed, nThreads)! If we do, the actual speed
+        // converges to a value that is 1-2 times higher than the target speed.
+        return markingTooFast ? nsPerOperation(targetSpeedToMarkAll, nThreads, slowdown) : 0;
+    }
+
+    /***/
+    private int slowdownIfLowSpaceLeft(boolean lowSpaceLeft) {
+        return lowSpaceLeft ? 3 : 1;
     }
 
     /**
@@ -338,9 +296,9 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
     }
 
     /***/
-    private void updateSpeedAndRatio(long speedForMarkAll, double targetDirtyRatio) {
-        this.speedForMarkAll = speedForMarkAll; //publish for metrics
-        this.targetDirtyRatio = targetDirtyRatio; //publish for metrics
+    private void publishSpeedAndRatioForMetrics(long speedForMarkAll, double targetDirtyRatio) {
+        this.speedForMarkAll = speedForMarkAll;
+        this.targetDirtyRatio = targetDirtyRatio;
     }
 
     /**
@@ -351,14 +309,14 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
      *
      * @param dirtyPagesRatio     current percent of dirty pages.
      * @param donePages           roughly, count of written and sync'ed pages
-     * @param curCpWriteSpeed     pages/second checkpoint write speed. 0 speed means 'no data'.
+     * @param avgCpWriteSpeed     pages/second checkpoint write speed. 0 speed means 'no data'.
      * @param cpTotalPages        total pages in checkpoint.
      * @return pages/second to mark to mark all clean pages as dirty till the end of checkpoint. 0 speed means 'no
      * data', or when we are not going to throttle due to the current dirty pages ratio being too high
      */
     private long calcSpeedToMarkAllSpaceTillEndOfCp(double dirtyPagesRatio, long donePages,
-                                                    long curCpWriteSpeed, int cpTotalPages) {
-        if (curCpWriteSpeed == 0)
+                                                    long avgCpWriteSpeed, int cpTotalPages) {
+        if (avgCpWriteSpeed == 0)
             return 0;
 
         if (cpTotalPages <= 0)
@@ -367,11 +325,30 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
         if (dirtyPagesRatio >= MAX_DIRTY_PAGES)
             return 0;
 
-        double remainedClearPages = (MAX_DIRTY_PAGES - dirtyPagesRatio) * totalPages;
+        // IDEA: here, when calculating the count of clean pages, it includes the pages under checkpoint. It is kinda
+        // legal because they can be written (using the Checkpoint Buffer to make a copy of the value to be
+        // checkpointed), but the CP Buffer is usually not too big, and if it gets nearly filled, writes become
+        // throttled really hard by exponential throttler. Maybe we should subtract the number of not-yet-written-by-CP
+        // pages from the count of clean pages? In such a case, we would lessen the risk of CP Buffer-caused throttling.
+        double remainedCleanPages = (MAX_DIRTY_PAGES - dirtyPagesRatio) * pageMemTotalPages();
+
+        double secondsTillCPEnd = 1.0 * (cpTotalPages - donePages) / avgCpWriteSpeed;
+
+        return (long)(remainedCleanPages / secondsTillCPEnd);
+    }
+
+    /** Returns total number of pages storable in page memory. */
+    private long pageMemTotalPages() {
+        long currentTotalPages = pageMemTotalPages;
+
+        if (currentTotalPages == 0) {
+            currentTotalPages = pageMemory.totalPages();
+            pageMemTotalPages = currentTotalPages;
+        }
 
-        double secondsTillCPEnd = 1.0 * (cpTotalPages - donePages) / curCpWriteSpeed;
+        assert currentTotalPages > 0 : "PageMemory.totalPages() is still 0";
 
-        return (long)(remainedClearPages / secondsTillCPEnd);
+        return currentTotalPages;
     }
 
     /**
@@ -457,24 +434,33 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
      * @param baseSpeed   speed to slow down.
      * @return sleep time in nanoseconds.
      */
-    long calcDelayTime(long baseSpeed) {
-        return calcDelayTime(baseSpeed, threadIdsCount(), 1);
+    long nsPerOperation(long baseSpeed) {
+        return nsPerOperation(baseSpeed, threadIdsCount());
     }
 
     /**
-     * @param baseSpeed   speed to slow down.
+     * @param speedPagesPerSec   speed to slow down.
+     * @param nThreads    operating threads.
+     * @return sleep time in nanoseconds.
+     */
+    private long nsPerOperation(long speedPagesPerSec, int nThreads) {
+        return nsPerOperation(speedPagesPerSec, nThreads, 1);
+    }
+
+    /**
+     * @param speedPagesPerSec   speed to slow down.
      * @param nThreads    operating threads.
      * @param factor      how much it is needed to slowdown base speed. 1 means delay to get exact base speed.
      * @return sleep time in nanoseconds.
      */
-    private long calcDelayTime(long baseSpeed, int nThreads, int factor) {
+    private long nsPerOperation(long speedPagesPerSec, int nThreads, int factor) {
         if (factor <= 0)
             throw new IllegalStateException("Coefficient should be positive");
 
-        if (baseSpeed <= 0)
+        if (speedPagesPerSec <= 0)
             return 0;
 
-        long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (baseSpeed);
+        long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (speedPagesPerSec);
 
         return factor * updTimeNsForOnePage;
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java
similarity index 83%
rename from modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java
rename to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java
index cad6b568f77..1a6f0f7b2ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java
@@ -29,9 +29,9 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 
 /**
- * Create File I/O that emulates poor checkpoint write speed.
+ * File I/O that emulates poor checkpoint write speed.
  */
-public class SlowCheckpointFileIOFactory implements FileIOFactory {
+public abstract class AbstractSlowCheckpointFileIOFactory implements FileIOFactory {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
@@ -42,13 +42,13 @@ public class SlowCheckpointFileIOFactory implements FileIOFactory {
     private final AtomicBoolean slowCheckpointEnabled;
 
     /** Checkpoint park nanos. */
-    private final int checkpointParkNanos;
+    private final long checkpointParkNanos;
 
     /**
      * @param slowCheckpointEnabled Slow checkpoint enabled.
      * @param checkpointParkNanos Checkpoint park nanos.
      */
-    public SlowCheckpointFileIOFactory(AtomicBoolean slowCheckpointEnabled, int checkpointParkNanos) {
+    protected AbstractSlowCheckpointFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) {
         this.slowCheckpointEnabled = slowCheckpointEnabled;
         this.checkpointParkNanos = checkpointParkNanos;
     }
@@ -78,9 +78,16 @@ public class SlowCheckpointFileIOFactory implements FileIOFactory {
 
             /** Parks current checkpoint thread if slow mode is enabled. */
             private void parkIfNeeded() {
-                if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("db-checkpoint-thread"))
+                if (slowCheckpointEnabled.get() && shouldSlowDownCurrentThread())
                     LockSupport.parkNanos(checkpointParkNanos);
             }
         };
     }
+
+    /**
+     * Returns {@code true} if the current thread should be slowed down.
+     *
+     * @return {@code true} if the current thread should be slowed down
+     */
+    protected abstract boolean shouldSlowDownCurrentThread();
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
index c573c6c0030..7218be92f65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
@@ -88,7 +88,7 @@ public class CheckpointBufferDeadlockTest extends GridCommonAbstractTest {
 
         cfg.setDataStorageConfiguration(
             new DataStorageConfiguration()
-                .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, CHECKPOINT_PARK_NANOS))
+                .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, CHECKPOINT_PARK_NANOS))
                 .setCheckpointThreads(checkpointThreads)
                 .setDefaultDataRegionConfiguration(
                     new DataRegionConfiguration()
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java
new file mode 100644
index 00000000000..caea7cc4ad0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * File I/O that emulates poor checkpoint metadata write speed.
+ */
+public class SlowCheckpointMetadataFileIOFactory extends AbstractSlowCheckpointFileIOFactory {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param slowCheckpointEnabled Slow checkpoint enabled.
+     * @param checkpointParkNanos Checkpoint park nanos.
+     */
+    public SlowCheckpointMetadataFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) {
+        super(slowCheckpointEnabled, checkpointParkNanos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean shouldSlowDownCurrentThread() {
+        return Thread.currentThread().getName().contains("db-checkpoint-thread");
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java
new file mode 100644
index 00000000000..6a3ac5eed30
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * File I/O that emulates poor checkpoint pages write speed.
+ */
+public class SlowCheckpointPagesFileIOFactory extends AbstractSlowCheckpointFileIOFactory {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param slowCheckpointEnabled Slow checkpoint enabled.
+     * @param checkpointParkNanos Checkpoint park nanos.
+     */
+    public SlowCheckpointPagesFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) {
+        super(slowCheckpointEnabled, checkpointParkNanos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean shouldSlowDownCurrentThread() {
+        return Thread.currentThread().getName().contains("checkpoint-runner-");
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 065d9921f68..57b07d633f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -67,13 +67,13 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
     public Timeout globalTimeout = Timeout.millis((int)GridTestUtils.DFLT_TEST_TIMEOUT);
 
     /** Logger. */
-    private IgniteLogger log = new NullLogger();
+    private final IgniteLogger log = new NullLogger();
 
     /** Page memory 2 g. */
-    private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class);
+    private final PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class);
 
     /** State checker. */
-    private CheckpointLockStateChecker stateChecker = () -> true;
+    private final CheckpointLockStateChecker stateChecker = () -> true;
 
     /** {@link CheckpointProgress} mock. */
     private final CheckpointProgress progress = mock(CheckpointProgress.class);
@@ -97,37 +97,56 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
+     * Tests that the speed-based throttler throttles when writing faster than target speed, AND the dirty ratio
+     * is above the target ratio.
      */
     @Test
-    public void breakInCaseTooFast() {
+    public void shouldThrottleWhenWritingTooFast() {
         PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
-        long time = throttle.getCleanPagesProtectionParkTime(0.67,
+        long parkTime = throttle.getCleanPagesProtectionParkTime(0.67,
             (362584 + 67064) / 2,
             328787,
             1,
             60184,
             23103);
 
-        assertTrue(time > 0);
+        assertTrue(parkTime > 0);
     }
 
     /**
      *
      */
     @Test
-    public void noBreakIfNotFastWrite() {
+    public void shouldNotThrottleWhenWritingSlowly() {
         PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
-        long time = throttle.getCleanPagesProtectionParkTime(0.47,
+        long parkTime = throttle.getCleanPagesProtectionParkTime(0.47,
             ((362584 + 67064) / 2),
             328787,
             1,
             20103,
             23103);
 
-        assertEquals(0, time);
+        assertEquals(0, parkTime);
+    }
+
+    /**
+     * Tests that the speed-based throttler does NOT throttle when there are plenty clean pages, even if writing
+     * faster than the current checkpoint speed.
+     */
+    @Test
+    public void shouldNotThrottleWhenThereArePlentyCleanPages() {
+        PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
+
+        long parkTime = throttle.getCleanPagesProtectionParkTime(0.0,
+            (362584 + 67064) / 2,
+            328787,
+            1,
+            60184,
+            23103);
+
+        assertEquals(0, parkTime);
     }
 
     /**
@@ -140,17 +159,14 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
 
         int markDirtySpeed = 34422;
         int cpWriteSpeed = 19416;
-        long time = throttle.getCleanPagesProtectionParkTime(0.04,
+        long time = throttle.getCleanPagesProtectionParkTime(0.67,
             ((903150 + 227217) / 2),
             903150,
             1,
             markDirtySpeed,
             cpWriteSpeed);
 
-        long mdSpeed = TimeUnit.SECONDS.toNanos(1) / markDirtySpeed;
-        long cpSpeed = TimeUnit.SECONDS.toNanos(1) / cpWriteSpeed;
-
-        assertEquals((cpSpeed - mdSpeed), time);
+        assertEquals(415110, time);
     }
 
     /**
@@ -272,7 +288,7 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
      *
      */
     @Test
-    public void tooMuchPagesMarkedDirty() {
+    public void doNotThrottleWhenDirtyPagesRatioIsTooHigh() {
         PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
         // 363308 350004 348976 10604
@@ -283,8 +299,6 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
             279,
             23933);
 
-        System.err.println(time);
-
         assertEquals(0, time);
     }
 
@@ -453,8 +467,6 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
                 break;
         }
 
-        System.out.println(throttle.throttleWeight());
-
         assertTrue(warnings.get() > 0);
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
index 3814bddab8f..9a890c54762 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 import java.io.Serializable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.ignite.DataRegionMetrics;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
@@ -33,7 +36,10 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer;
 import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,7 +61,7 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
 
         DataStorageConfiguration dbCfg = new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
-                .setMaxSize(4000L * 1024 * 1024)
+                .setMaxSize(1000L * 1024 * 1024)
                 .setCheckpointPageBufferSize(1000L * 1000 * 1000)
                 .setName("dfltDataRegion")
                 .setMetricsEnabled(true)
@@ -132,6 +138,9 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
             }, 2, "read-loader");
 
             final HitRateMetric putRate = new HitRateMetric("putRate", "", 1000, 5);
+            final AtomicLong putCount = new AtomicLong();
+            final AtomicDouble maxDirtyRatio = new AtomicDouble();
+            long startNanos = System.nanoTime();
 
             GridTestUtils.runAsync(new Runnable() {
                 @Override public void run() {
@@ -142,25 +151,38 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
                             if (m.getName().equals("dfltDataRegion"))
                                 dirtyPages = m.getDirtyPages();
 
-                        long cpBufPages = 0;
+                        long cpBufPages;
 
                         long cpWrittenPages;
 
-                        AtomicInteger cntr = ((GridCacheDatabaseSharedManager)((ignite(0))
-                            .context().cache().context().database())).getCheckpointer().currentProgress().writtenPagesCounter();
+                        Checkpointer checkpointer = ((GridCacheDatabaseSharedManager)((ignite(0))
+                            .context().cache().context().database())).getCheckpointer();
+                        AtomicInteger cntr = checkpointer.currentProgress().writtenPagesCounter();
 
                         cpWrittenPages = cntr == null ? 0 : cntr.get();
 
                         try {
-                            cpBufPages = ((ignite(0)).context().cache().context().database()
-                                .dataRegion("dfltDataRegion").pageMemory()).checkpointBufferPagesCount();
+                            PageMemoryEx pageMemory = (PageMemoryEx)(ignite(0)).context().cache().context().database()
+                                .dataRegion("dfltDataRegion").pageMemory();
+                            cpBufPages = pageMemory.checkpointBufferPagesCount();
+
+                            if (System.nanoTime() - startNanos > TimeUnit.SECONDS.toNanos(10)) {
+                                double currentDirtyRatio = (double)dirtyPages / pageMemory.totalPages();
+                                double newMaxDirtyRatio = Math.max(maxDirtyRatio.get(), currentDirtyRatio);
+                                maxDirtyRatio.set(newMaxDirtyRatio);
+                            }
                         }
                         catch (IgniteCheckedException e) {
                             e.printStackTrace();
+                            throw new RuntimeException("Something went wrong", e);
                         }
 
-                        System.out.println("@@@ putsPerSec=," + (putRate.value()) + ", getsPerSec=," + (getRate.value()) + ", dirtyPages=,"
-                            + dirtyPages + ", cpWrittenPages=," + cpWrittenPages + ", cpBufPages=," + cpBufPages);
+                        System.out.println("@@@ globalPutsPerSec="
+                            + String.format("%.2f", globalPutsPerSec(putCount, startNanos))
+                            + ", putsPerSec=" + (putRate.value()) + ", getsPerSec=" + (getRate.value()) + ", dirtyPages="
+                            + dirtyPages + ", cpWrittenPages=" + cpWrittenPages + ", cpBufPages=" + cpBufPages
+                            + ", maxDirtyRatio=" + String.format("%.2f", maxDirtyRatio.get())
+                        );
 
                         try {
                             Thread.sleep(1000);
@@ -172,14 +194,27 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
                 }
             }, "metrics-view");
 
+            final boolean intermittentPutsMode = false;
+
             try (IgniteDataStreamer<Object, Object> ds = ig.dataStreamer(CACHE_NAME)) {
                 ds.allowOverwrite(true);
 
-                for (int i = 0; i < keyCnt * 10; i++) {
-                    ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(),
-                        ThreadLocalRandom.current().nextInt()));
+                while (true) {
+                    long tensOfSecondsPassed = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos) / 10;
+                    if (intermittentPutsMode && tensOfSecondsPassed % 2 == 1) {
+                        System.out.println("... sleeping ...");
+                        Thread.sleep(1000);
+                    }
+                    else {
+                        ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(),
+                            ThreadLocalRandom.current().nextInt()));
 
-                    putRate.increment();
+                        putRate.increment();
+                        putCount.incrementAndGet();
+                    }
+
+                    if (System.nanoTime() - startNanos > TimeUnit.MINUTES.toNanos(10))
+                        break;
                 }
             }
 
@@ -190,6 +225,11 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
         }
     }
 
+    /***/
+    private double globalPutsPerSec(AtomicLong putCount, long startNanos) {
+        return (double)putCount.get() * TimeUnit.SECONDS.toNanos(1) / (System.nanoTime() - startNanos);
+    }
+
     /**
      *
      */
@@ -247,4 +287,9 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
         cleanPersistenceDir();
         U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false));
     }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new StopNodeOrHaltFailureHandler();
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index 4a20d1890a9..19e74c3f1b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
@@ -69,7 +69,7 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
             .setCheckpointFrequency(20_000)
             .setWriteThrottlingEnabled(true)
             .setCheckpointThreads(1)
-            .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, 5_000_000));
+            .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, 5_000_000));
 
         cfg.setDataStorageConfiguration(dbCfg);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java
new file mode 100644
index 00000000000..6439f5e4ec7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/**
+ * Integration tests for {@link PagesWriteSpeedBasedThrottle}.
+ */
+public class SpeedBasedThrottleIntegrationTest extends GridCommonAbstractTest {
+    /***/
+    private final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        DataStorageConfiguration dbCfg = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                // set small region size to make it easy achieve the necessity to throttle with speed-based throttle
+                .setMaxSize(60 * 1024 * 1024)
+                .setPersistenceEnabled(true)
+            )
+            .setCheckpointFrequency(200)
+            .setWriteThrottlingEnabled(true)
+            .setFileIOFactory(
+                new SlowCheckpointMetadataFileIOFactory(
+                    new AtomicBoolean(true), TimeUnit.MILLISECONDS.toNanos(10000)
+                )
+            );
+
+        return cfg.setDataStorageConfiguration(dbCfg)
+            .setConsistentId(gridName)
+            .setGridLogger(listeningLog);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 60 * 1000;
+    }
+
+    /**
+     */
+    @Test
+    public void speedBasedThrottleShouldBeActivatedWhenNeeded() throws Exception {
+        AtomicBoolean throttled = new AtomicBoolean(false);
+        listeningLog.registerListener(message -> {
+            if (message.startsWith("Throttling is applied to page modifications")) {
+                throttled.set(true);
+            }
+        });
+
+        Ignite ignite = startGrids(1);
+
+        ignite.cluster().state(ACTIVE);
+        IgniteCache<Object, Object> cache = ignite.createCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 1_000_000; i++) {
+            cache.put("key" + i, ThreadLocalRandom.current().nextDouble());
+
+            if (throttled.get()) {
+                break;
+            }
+        }
+
+        assertTrue("Throttling was not triggered", throttled.get());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java
index 386738d7965..62ab1550735 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -64,7 +64,7 @@ public class CheckpointTest extends AbstractPerformanceStatisticsTest {
                 .setMetricsEnabled(true)
                 .setPersistenceEnabled(true))
             .setWriteThrottlingEnabled(true)
-            .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, 500_000))
+            .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, 500_000))
             .setCheckpointThreads(1));
 
         return cfg;
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
index aeb1865904c..536c7a782f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryNoStoreLeakTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.SpeedBasedThrottleBreakdownTest;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.SpeedBasedThrottleIntegrationTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTest;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTestPersistence;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIOTest;
@@ -90,6 +91,7 @@ public class IgnitePdsTestSuite5 {
         // Write throttling
         GridTestUtils.addTestIfNeeded(suite, PagesWriteThrottleSmokeTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, SpeedBasedThrottleBreakdownTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, SpeedBasedThrottleIntegrationTest.class, ignoredTests);
 
         // Discovery data handling on node join and old cluster abnormal shutdown
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsDiscoDataHandlingInNewClusterTest.class, ignoredTests);