You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/04 08:45:35 UTC
[ignite] branch master updated: IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4b58545534a IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924)
4b58545534a is described below
commit 4b58545534a49a39b8fd60bb560d1bec4ae235fc
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Sat Jun 4 12:45:24 2022 +0400
IGNITE-16582 Improve behavior of speed-based throttling when dirty pages ratio is low (#9924)
---
.../checkpoint/CheckpointProgressImpl.java | 6 +-
.../pagemem/PagesWriteSpeedBasedThrottle.java | 8 +-
...edBasedMemoryConsumptionThrottlingStrategy.java | 170 ++++++++++-----------
...va => AbstractSlowCheckpointFileIOFactory.java} | 17 ++-
.../db/CheckpointBufferDeadlockTest.java | 2 +-
.../db/SlowCheckpointMetadataFileIOFactory.java | 41 +++++
.../db/SlowCheckpointPagesFileIOFactory.java | 41 +++++
.../pagemem/IgniteThrottlingUnitTest.java | 52 ++++---
.../pagemem/PagesWriteThrottleSandboxTest.java | 69 +++++++--
.../pagemem/PagesWriteThrottleSmokeTest.java | 4 +-
.../pagemem/SpeedBasedThrottleIntegrationTest.java | 114 ++++++++++++++
.../performancestatistics/CheckpointTest.java | 4 +-
.../ignite/testsuites/IgnitePdsTestSuite5.java | 2 +
13 files changed, 390 insertions(+), 140 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
index 5c78bccc94e..1f0e7cebd47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
@@ -246,11 +246,11 @@ public class CheckpointProgressImpl implements CheckpointProgress {
}
/** {@inheritDoc} */
- @Override public void updateEvictedPages(int deltha) {
- A.ensure(deltha > 0, "param must be positive");
+ @Override public void updateEvictedPages(int delta) {
+ A.ensure(delta > 0, "param must be positive");
if (evictedPagesCounter() != null)
- evictedPagesCounter().addAndGet(deltha);
+ evictedPagesCounter().addAndGet(delta);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
index 276b448e454..f9dae4a5854 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteOutClosure;
+import org.jetbrains.annotations.TestOnly;
/**
* Throttles threads that generate dirty pages during ongoing checkpoint.
@@ -104,7 +105,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
this.log = log;
cleanPagesProtector = new SpeedBasedMemoryConsumptionThrottlingStrategy(pageMemory, cpProgress,
- markSpeedAndAvgParkTime);
+ markSpeedAndAvgParkTime);
cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
}
@@ -186,7 +187,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
if (prevWarnTime.compareAndSet(prevWarningNs, curNs) && log.isInfoEnabled()) {
String msg = String.format("Throttling is applied to page modifications " +
- "[percentOfPartTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " +
+ "[fractionOfParkTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " +
"estIdealMarkDirty=%d pages/sec, curDirty=%.2f, maxDirty=%.2f, avgParkTime=%d ns, " +
"pages: (total=%d, evicted=%d, written=%d, synced=%d, cpBufUsed=%d, cpBufTotal=%d)]",
weight, getMarkDirtySpeed(), getCpWriteSpeed(),
@@ -210,6 +211,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
* @param curCpWriteSpeed average checkpoint write speed, pages/sec.
* @return time in nanoseconds to part or 0 if throttling is not required.
*/
+ @TestOnly
long getCleanPagesProtectionParkTime(
double dirtyPagesRatio,
long fullyCompletedPages,
@@ -293,7 +295,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
if (speed <= 0)
return 0;
- long timeForOnePage = cleanPagesProtector.calcDelayTime(speed);
+ long timeForOnePage = cleanPagesProtector.nsPerOperation(speed);
if (timeForOnePage == 0)
return 0;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java
index 2a5aba62511..9fe7a90eb90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedMemoryConsumptionThrottlingStrategy.java
@@ -57,7 +57,7 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
/**
* Total pages possible to store in page memory.
*/
- private final long totalPages;
+ private volatile long pageMemTotalPages;
/**
* Last estimated speed for marking all clear pages as dirty till the end of checkpoint.
@@ -111,8 +111,6 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
this.pageMemory = pageMemory;
this.cpProgress = cpProgress;
this.markSpeedAndAvgParkTime = markSpeedAndAvgParkTime;
-
- totalPages = pageMemory.totalPages();
}
/**
@@ -149,7 +147,7 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
final int cpWrittenPages = writtenPagesCounter.get();
final long donePages = cpDonePagesEstimation(cpWrittenPages);
- final long markDirtySpeed = markSpeedAndAvgParkTime.getSpeedOpsPerSec(curNanoTime);
+ final long instantaneousMarkDirtySpeed = markSpeedAndAvgParkTime.getSpeedOpsPerSec(curNanoTime);
// NB: we update progress for speed calculation only in this (clean pages protection) scenario, because
// we only use the computed speed in this same scenario and for reporting in logs (where it's not super
// important to display an ideally accurate speed), but not in the CP Buffer protection scenario.
@@ -157,7 +155,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
// The progress is set to 0 at the beginning of a checkpoint, so we can be sure that the start time remembered
// in cpWriteSpeed is pretty accurate even without writing to cpWriteSpeed from this method.
cpWriteSpeed.setProgress(donePages, curNanoTime);
- final long curCpWriteSpeed = cpWriteSpeed.getOpsPerSecond(curNanoTime);
+ // TODO: IGNITE-16878 use exponential moving average so that we react to changes faster?
+ final long avgCpWriteSpeed = cpWriteSpeed.getOpsPerSecond(curNanoTime);
final int cpTotalPages = cpTotalPages();
@@ -166,11 +165,11 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
// CheckpointProgressImpl.clearCounters() is invoked at the end of a checkpoint (by falling through
// between two volatile assignments). When we get here, we don't have any information about the total
// number of pages in the current CP, so we calculate park time by only using information we have.
- return parkTimeToThrottleByJustCPSpeed(markDirtySpeed, curCpWriteSpeed);
+ return parkTimeToThrottleByJustCPSpeed(instantaneousMarkDirtySpeed, avgCpWriteSpeed);
}
else {
- return speedBasedParkTime(cpWrittenPages, donePages, markDirtySpeed,
- curCpWriteSpeed, cpTotalPages);
+ return speedBasedParkTime(cpWrittenPages, donePages, cpTotalPages, instantaneousMarkDirtySpeed,
+ avgCpWriteSpeed);
}
}
@@ -183,6 +182,9 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
* @return estimation of work done (in pages)
*/
private int cpDonePagesEstimation(int cpWrittenPages) {
+ // TODO: IGNITE-16879 - this only works correctly if time-to-write a page is close to time-to-sync a page.
+ // In reality, this does not seem to hold, which produces wrong estimations. We could measure the real times
+ // in Checkpointer and make this estimation a lot more precise.
return (cpWrittenPages + cpSyncedPages()) / 2;
}
@@ -198,15 +200,15 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > curCpWriteSpeed;
if (throttleByCpSpeed) {
- return calcDelayTime(curCpWriteSpeed);
+ return nsPerOperation(curCpWriteSpeed);
}
return 0;
}
/***/
- private long speedBasedParkTime(int cpWrittenPages, long donePages, long markDirtySpeed,
- long curCpWriteSpeed, int cpTotalPages) {
+ private long speedBasedParkTime(int cpWrittenPages, long donePages, int cpTotalPages,
+ long instantaneousMarkDirtySpeed, long avgCpWriteSpeed) {
final double dirtyPagesRatio = pageMemory.getDirtyPagesRatio();
currDirtyRatio = dirtyPagesRatio;
@@ -220,8 +222,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
donePages,
notEvictedPagesTotal(cpTotalPages),
threadIdsCount(),
- markDirtySpeed,
- curCpWriteSpeed);
+ instantaneousMarkDirtySpeed,
+ avgCpWriteSpeed);
}
}
@@ -237,8 +239,8 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
* @param donePages roughly, written & fsynced pages count.
* @param cpTotalPages total checkpoint scope.
* @param nThreads number of threads providing data during current checkpoint.
- * @param markDirtySpeed registered mark dirty speed, pages/sec.
- * @param curCpWriteSpeed average checkpoint write speed, pages/sec.
+ * @param instantaneousMarkDirtySpeed registered (during approx last second) mark dirty speed, pages/sec.
+ * @param avgCpWriteSpeed average checkpoint write speed, pages/sec.
* @return time in nanoseconds to part or 0 if throttling is not required.
*/
long getParkTime(
@@ -246,84 +248,40 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
long donePages,
int cpTotalPages,
int nThreads,
- long markDirtySpeed,
- long curCpWriteSpeed) {
+ long instantaneousMarkDirtySpeed,
+ long avgCpWriteSpeed) {
final long targetSpeedToMarkAll = calcSpeedToMarkAllSpaceTillEndOfCp(dirtyPagesRatio, donePages,
- curCpWriteSpeed, cpTotalPages);
+ avgCpWriteSpeed, cpTotalPages);
final double targetCurrentDirtyRatio = targetCurrentDirtyRatio(donePages, cpTotalPages);
- updateSpeedAndRatio(targetSpeedToMarkAll, targetCurrentDirtyRatio);
-
- long delayByCpWrite = delayIfMarkingFasterThanCPWriteSpeedAllows(markDirtySpeed, curCpWriteSpeed,
- dirtyPagesRatio, nThreads, targetSpeedToMarkAll, targetCurrentDirtyRatio);
- long delayByMarkAllWrite = delayIfMarkingFasterThanTargetSpeedAllows(markDirtySpeed, dirtyPagesRatio, nThreads,
- targetSpeedToMarkAll, targetCurrentDirtyRatio);
-
- return Math.max(delayByCpWrite, delayByMarkAllWrite);
- }
-
- /***/
- private long delayIfMarkingFasterThanCPWriteSpeedAllows(long markDirtySpeed, long curCpWriteSpeed,
- double dirtyPagesRatio, int nThreads,
- long targetSpeedToMarkAll, double targetCurrentDirtyRatio) {
- final double allowedCpWriteSpeedExcessMultiplier = allowedCpWriteSpeedExcessMultiplier(markDirtySpeed,
- dirtyPagesRatio, targetSpeedToMarkAll, targetCurrentDirtyRatio);
- final boolean throttleByCpSpeed = curCpWriteSpeed > 0
- && markDirtySpeed > (allowedCpWriteSpeedExcessMultiplier * curCpWriteSpeed);
-
- if (!throttleByCpSpeed) {
- return 0;
- }
-
- int slowdown = slowdownIfLowSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
- long nanosecsToMarkOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / markDirtySpeed;
- long nanosecsToWriteOneCPPage = calcDelayTime(curCpWriteSpeed, nThreads, slowdown);
- return nanosecsToWriteOneCPPage - nanosecsToMarkOnePage;
- }
-
- /***/
- private double allowedCpWriteSpeedExcessMultiplier(long markDirtySpeed, double dirtyPagesRatio,
- long targetSpeedToMarkAll, double targetCurrentDirtyRatio) {
- final boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
-
- // for case of speedForMarkAll >> markDirtySpeed, allow write little bit faster than CP average
- final double allowWriteFasterThanCp;
- if (markDirtySpeed > 0 && markDirtySpeed < targetSpeedToMarkAll)
- allowWriteFasterThanCp = 0.1 * targetSpeedToMarkAll / markDirtySpeed;
- else if (dirtyPagesRatio > targetCurrentDirtyRatio)
- allowWriteFasterThanCp = 0.0;
- else
- allowWriteFasterThanCp = 0.1;
-
- return lowSpaceLeft
- ? 1.0
- : 1.0 + allowWriteFasterThanCp;
- }
-
- /***/
- private int slowdownIfLowSpaceLeft(double dirtyPagesRatio, double targetCurrentDirtyRatio) {
- boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
- return slowdownIfLowSpaceLeft(lowSpaceLeft);
- }
+ publishSpeedAndRatioForMetrics(targetSpeedToMarkAll, targetCurrentDirtyRatio);
- /***/
- private int slowdownIfLowSpaceLeft(boolean lowSpaceLeft) {
- return lowSpaceLeft ? 3 : 1;
+ return delayIfMarkingFasterThanTargetSpeedAllows(instantaneousMarkDirtySpeed,
+ dirtyPagesRatio, nThreads, targetSpeedToMarkAll, targetCurrentDirtyRatio);
}
/***/
- private long delayIfMarkingFasterThanTargetSpeedAllows(long markDirtySpeed, double dirtyPagesRatio, int nThreads,
+ private long delayIfMarkingFasterThanTargetSpeedAllows(long instantaneousMarkDirtySpeed, double dirtyPagesRatio,
+ int nThreads,
long targetSpeedToMarkAll, double targetCurrentDirtyRatio) {
final boolean lowSpaceLeft = lowCleanSpaceLeft(dirtyPagesRatio, targetCurrentDirtyRatio);
final int slowdown = slowdownIfLowSpaceLeft(lowSpaceLeft);
double multiplierForSpeedToMarkAll = lowSpaceLeft ? 0.8 : 1.0;
boolean markingTooFastNow = targetSpeedToMarkAll > 0
- && markDirtySpeed > multiplierForSpeedToMarkAll * targetSpeedToMarkAll;
+ && instantaneousMarkDirtySpeed > multiplierForSpeedToMarkAll * targetSpeedToMarkAll;
boolean markedTooFastSinceCPStart = dirtyPagesRatio > targetCurrentDirtyRatio;
boolean markingTooFast = markedTooFastSinceCPStart && markingTooFastNow;
- return markingTooFast ? calcDelayTime(targetSpeedToMarkAll, nThreads, slowdown) : 0;
+
+ // We must NOT subtract nsPerOperation(instantaneousMarkDirtySpeed, nThreads)! If we do, the actual speed
+ // converges to a value that is 1-2 times higher than the target speed.
+ return markingTooFast ? nsPerOperation(targetSpeedToMarkAll, nThreads, slowdown) : 0;
+ }
+
+ /***/
+ private int slowdownIfLowSpaceLeft(boolean lowSpaceLeft) {
+ return lowSpaceLeft ? 3 : 1;
}
/**
@@ -338,9 +296,9 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
}
/***/
- private void updateSpeedAndRatio(long speedForMarkAll, double targetDirtyRatio) {
- this.speedForMarkAll = speedForMarkAll; //publish for metrics
- this.targetDirtyRatio = targetDirtyRatio; //publish for metrics
+ private void publishSpeedAndRatioForMetrics(long speedForMarkAll, double targetDirtyRatio) {
+ this.speedForMarkAll = speedForMarkAll;
+ this.targetDirtyRatio = targetDirtyRatio;
}
/**
@@ -351,14 +309,14 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
*
* @param dirtyPagesRatio current percent of dirty pages.
* @param donePages roughly, count of written and sync'ed pages
- * @param curCpWriteSpeed pages/second checkpoint write speed. 0 speed means 'no data'.
+ * @param avgCpWriteSpeed pages/second checkpoint write speed. 0 speed means 'no data'.
* @param cpTotalPages total pages in checkpoint.
* @return pages/second to mark to mark all clean pages as dirty till the end of checkpoint. 0 speed means 'no
* data', or when we are not going to throttle due to the current dirty pages ratio being too high
*/
private long calcSpeedToMarkAllSpaceTillEndOfCp(double dirtyPagesRatio, long donePages,
- long curCpWriteSpeed, int cpTotalPages) {
- if (curCpWriteSpeed == 0)
+ long avgCpWriteSpeed, int cpTotalPages) {
+ if (avgCpWriteSpeed == 0)
return 0;
if (cpTotalPages <= 0)
@@ -367,11 +325,30 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
if (dirtyPagesRatio >= MAX_DIRTY_PAGES)
return 0;
- double remainedClearPages = (MAX_DIRTY_PAGES - dirtyPagesRatio) * totalPages;
+ // IDEA: here, when calculating the count of clean pages, it includes the pages under checkpoint. It is kinda
+ // legal because they can be written (using the Checkpoint Buffer to make a copy of the value to be
+ // checkpointed), but the CP Buffer is usually not too big, and if it gets nearly filled, writes become
+ // throttled really hard by exponential throttler. Maybe we should subtract the number of not-yet-written-by-CP
+ // pages from the count of clean pages? In such a case, we would lessen the risk of CP Buffer-caused throttling.
+ double remainedCleanPages = (MAX_DIRTY_PAGES - dirtyPagesRatio) * pageMemTotalPages();
+
+ double secondsTillCPEnd = 1.0 * (cpTotalPages - donePages) / avgCpWriteSpeed;
+
+ return (long)(remainedCleanPages / secondsTillCPEnd);
+ }
+
+ /** Returns total number of pages storable in page memory. */
+ private long pageMemTotalPages() {
+ long currentTotalPages = pageMemTotalPages;
+
+ if (currentTotalPages == 0) {
+ currentTotalPages = pageMemory.totalPages();
+ pageMemTotalPages = currentTotalPages;
+ }
- double secondsTillCPEnd = 1.0 * (cpTotalPages - donePages) / curCpWriteSpeed;
+ assert currentTotalPages > 0 : "PageMemory.totalPages() is still 0";
- return (long)(remainedClearPages / secondsTillCPEnd);
+ return currentTotalPages;
}
/**
@@ -457,24 +434,33 @@ class SpeedBasedMemoryConsumptionThrottlingStrategy {
* @param baseSpeed speed to slow down.
* @return sleep time in nanoseconds.
*/
- long calcDelayTime(long baseSpeed) {
- return calcDelayTime(baseSpeed, threadIdsCount(), 1);
+ long nsPerOperation(long baseSpeed) {
+ return nsPerOperation(baseSpeed, threadIdsCount());
}
/**
- * @param baseSpeed speed to slow down.
+ * @param speedPagesPerSec speed to slow down.
+ * @param nThreads operating threads.
+ * @return sleep time in nanoseconds.
+ */
+ private long nsPerOperation(long speedPagesPerSec, int nThreads) {
+ return nsPerOperation(speedPagesPerSec, nThreads, 1);
+ }
+
+ /**
+ * @param speedPagesPerSec speed to slow down.
* @param nThreads operating threads.
* @param factor how much it is needed to slowdown base speed. 1 means delay to get exact base speed.
* @return sleep time in nanoseconds.
*/
- private long calcDelayTime(long baseSpeed, int nThreads, int factor) {
+ private long nsPerOperation(long speedPagesPerSec, int nThreads, int factor) {
if (factor <= 0)
throw new IllegalStateException("Coefficient should be positive");
- if (baseSpeed <= 0)
+ if (speedPagesPerSec <= 0)
return 0;
- long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (baseSpeed);
+ long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (speedPagesPerSec);
return factor * updTimeNsForOnePage;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java
similarity index 83%
rename from modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java
rename to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java
index cad6b568f77..1a6f0f7b2ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointFileIOFactory.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/AbstractSlowCheckpointFileIOFactory.java
@@ -29,9 +29,9 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
/**
- * Create File I/O that emulates poor checkpoint write speed.
+ * File I/O that emulates poor checkpoint write speed.
*/
-public class SlowCheckpointFileIOFactory implements FileIOFactory {
+public abstract class AbstractSlowCheckpointFileIOFactory implements FileIOFactory {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -42,13 +42,13 @@ public class SlowCheckpointFileIOFactory implements FileIOFactory {
private final AtomicBoolean slowCheckpointEnabled;
/** Checkpoint park nanos. */
- private final int checkpointParkNanos;
+ private final long checkpointParkNanos;
/**
* @param slowCheckpointEnabled Slow checkpoint enabled.
* @param checkpointParkNanos Checkpoint park nanos.
*/
- public SlowCheckpointFileIOFactory(AtomicBoolean slowCheckpointEnabled, int checkpointParkNanos) {
+ protected AbstractSlowCheckpointFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) {
this.slowCheckpointEnabled = slowCheckpointEnabled;
this.checkpointParkNanos = checkpointParkNanos;
}
@@ -78,9 +78,16 @@ public class SlowCheckpointFileIOFactory implements FileIOFactory {
/** Parks current checkpoint thread if slow mode is enabled. */
private void parkIfNeeded() {
- if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("db-checkpoint-thread"))
+ if (slowCheckpointEnabled.get() && shouldSlowDownCurrentThread())
LockSupport.parkNanos(checkpointParkNanos);
}
};
}
+
+ /**
+ * Returns {@code true} if the current thread should be slowed down.
+ *
+ * @return {@code true} if the current thread should be slowed down
+ */
+ protected abstract boolean shouldSlowDownCurrentThread();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
index c573c6c0030..7218be92f65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
@@ -88,7 +88,7 @@ public class CheckpointBufferDeadlockTest extends GridCommonAbstractTest {
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
- .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, CHECKPOINT_PARK_NANOS))
+ .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, CHECKPOINT_PARK_NANOS))
.setCheckpointThreads(checkpointThreads)
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java
new file mode 100644
index 00000000000..caea7cc4ad0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointMetadataFileIOFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * File I/O that emulates poor checkpoint metadata write speed.
+ */
+public class SlowCheckpointMetadataFileIOFactory extends AbstractSlowCheckpointFileIOFactory {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param slowCheckpointEnabled Slow checkpoint enabled.
+ * @param checkpointParkNanos Checkpoint park nanos.
+ */
+ public SlowCheckpointMetadataFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) {
+ super(slowCheckpointEnabled, checkpointParkNanos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean shouldSlowDownCurrentThread() {
+ return Thread.currentThread().getName().contains("db-checkpoint-thread");
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java
new file mode 100644
index 00000000000..6a3ac5eed30
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowCheckpointPagesFileIOFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * File I/O that emulates poor checkpoint pages write speed.
+ */
+public class SlowCheckpointPagesFileIOFactory extends AbstractSlowCheckpointFileIOFactory {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param slowCheckpointEnabled Slow checkpoint enabled.
+ * @param checkpointParkNanos Checkpoint park nanos.
+ */
+ public SlowCheckpointPagesFileIOFactory(AtomicBoolean slowCheckpointEnabled, long checkpointParkNanos) {
+ super(slowCheckpointEnabled, checkpointParkNanos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean shouldSlowDownCurrentThread() {
+ return Thread.currentThread().getName().contains("checkpoint-runner-");
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 065d9921f68..57b07d633f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -67,13 +67,13 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
public Timeout globalTimeout = Timeout.millis((int)GridTestUtils.DFLT_TEST_TIMEOUT);
/** Logger. */
- private IgniteLogger log = new NullLogger();
+ private final IgniteLogger log = new NullLogger();
/** Page memory 2 g. */
- private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class);
+ private final PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class);
/** State checker. */
- private CheckpointLockStateChecker stateChecker = () -> true;
+ private final CheckpointLockStateChecker stateChecker = () -> true;
/** {@link CheckpointProgress} mock. */
private final CheckpointProgress progress = mock(CheckpointProgress.class);
@@ -97,37 +97,56 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
}
/**
- *
+ * Tests that the speed-based throttler throttles when writing faster than target speed, AND the dirty ratio
+ * is above the target ratio.
*/
@Test
- public void breakInCaseTooFast() {
+ public void shouldThrottleWhenWritingTooFast() {
PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
- long time = throttle.getCleanPagesProtectionParkTime(0.67,
+ long parkTime = throttle.getCleanPagesProtectionParkTime(0.67,
(362584 + 67064) / 2,
328787,
1,
60184,
23103);
- assertTrue(time > 0);
+ assertTrue(parkTime > 0);
}
/**
*
*/
@Test
- public void noBreakIfNotFastWrite() {
+ public void shouldNotThrottleWhenWritingSlowly() {
PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
- long time = throttle.getCleanPagesProtectionParkTime(0.47,
+ long parkTime = throttle.getCleanPagesProtectionParkTime(0.47,
((362584 + 67064) / 2),
328787,
1,
20103,
23103);
- assertEquals(0, time);
+ assertEquals(0, parkTime);
+ }
+
+ /**
+ * Tests that the speed-based throttler does NOT throttle when there are plenty clean pages, even if writing
+ * faster than the current checkpoint speed.
+ */
+ @Test
+ public void shouldNotThrottleWhenThereArePlentyCleanPages() {
+ PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
+
+ long parkTime = throttle.getCleanPagesProtectionParkTime(0.0,
+ (362584 + 67064) / 2,
+ 328787,
+ 1,
+ 60184,
+ 23103);
+
+ assertEquals(0, parkTime);
}
/**
@@ -140,17 +159,14 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
int markDirtySpeed = 34422;
int cpWriteSpeed = 19416;
- long time = throttle.getCleanPagesProtectionParkTime(0.04,
+ long time = throttle.getCleanPagesProtectionParkTime(0.67,
((903150 + 227217) / 2),
903150,
1,
markDirtySpeed,
cpWriteSpeed);
- long mdSpeed = TimeUnit.SECONDS.toNanos(1) / markDirtySpeed;
- long cpSpeed = TimeUnit.SECONDS.toNanos(1) / cpWriteSpeed;
-
- assertEquals((cpSpeed - mdSpeed), time);
+ assertEquals(415110, time);
}
/**
@@ -272,7 +288,7 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
*
*/
@Test
- public void tooMuchPagesMarkedDirty() {
+ public void doNotThrottleWhenDirtyPagesRatioIsTooHigh() {
PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
// 363308 350004 348976 10604
@@ -283,8 +299,6 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
279,
23933);
- System.err.println(time);
-
assertEquals(0, time);
}
@@ -453,8 +467,6 @@ public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
break;
}
- System.out.println(throttle.throttleWeight());
-
assertTrue(warnings.get() > 0);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
index 3814bddab8f..9a890c54762 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.AtomicDouble;
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
@@ -33,7 +36,10 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer;
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -55,7 +61,7 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
DataStorageConfiguration dbCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setMaxSize(4000L * 1024 * 1024)
+ .setMaxSize(1000L * 1024 * 1024)
.setCheckpointPageBufferSize(1000L * 1000 * 1000)
.setName("dfltDataRegion")
.setMetricsEnabled(true)
@@ -132,6 +138,9 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
}, 2, "read-loader");
final HitRateMetric putRate = new HitRateMetric("putRate", "", 1000, 5);
+ final AtomicLong putCount = new AtomicLong();
+ final AtomicDouble maxDirtyRatio = new AtomicDouble();
+ long startNanos = System.nanoTime();
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
@@ -142,25 +151,38 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
if (m.getName().equals("dfltDataRegion"))
dirtyPages = m.getDirtyPages();
- long cpBufPages = 0;
+ long cpBufPages;
long cpWrittenPages;
- AtomicInteger cntr = ((GridCacheDatabaseSharedManager)((ignite(0))
- .context().cache().context().database())).getCheckpointer().currentProgress().writtenPagesCounter();
+ Checkpointer checkpointer = ((GridCacheDatabaseSharedManager)((ignite(0))
+ .context().cache().context().database())).getCheckpointer();
+ AtomicInteger cntr = checkpointer.currentProgress().writtenPagesCounter();
cpWrittenPages = cntr == null ? 0 : cntr.get();
try {
- cpBufPages = ((ignite(0)).context().cache().context().database()
- .dataRegion("dfltDataRegion").pageMemory()).checkpointBufferPagesCount();
+ PageMemoryEx pageMemory = (PageMemoryEx)(ignite(0)).context().cache().context().database()
+ .dataRegion("dfltDataRegion").pageMemory();
+ cpBufPages = pageMemory.checkpointBufferPagesCount();
+
+ if (System.nanoTime() - startNanos > TimeUnit.SECONDS.toNanos(10)) {
+ double currentDirtyRatio = (double)dirtyPages / pageMemory.totalPages();
+ double newMaxDirtyRatio = Math.max(maxDirtyRatio.get(), currentDirtyRatio);
+ maxDirtyRatio.set(newMaxDirtyRatio);
+ }
}
catch (IgniteCheckedException e) {
e.printStackTrace();
+ throw new RuntimeException("Something went wrong", e);
}
- System.out.println("@@@ putsPerSec=," + (putRate.value()) + ", getsPerSec=," + (getRate.value()) + ", dirtyPages=,"
- + dirtyPages + ", cpWrittenPages=," + cpWrittenPages + ", cpBufPages=," + cpBufPages);
+ System.out.println("@@@ globalPutsPerSec="
+ + String.format("%.2f", globalPutsPerSec(putCount, startNanos))
+ + ", putsPerSec=" + (putRate.value()) + ", getsPerSec=" + (getRate.value()) + ", dirtyPages="
+ + dirtyPages + ", cpWrittenPages=" + cpWrittenPages + ", cpBufPages=" + cpBufPages
+ + ", maxDirtyRatio=" + String.format("%.2f", maxDirtyRatio.get())
+ );
try {
Thread.sleep(1000);
@@ -172,14 +194,27 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
}
}, "metrics-view");
+ final boolean intermittentPutsMode = false;
+
try (IgniteDataStreamer<Object, Object> ds = ig.dataStreamer(CACHE_NAME)) {
ds.allowOverwrite(true);
- for (int i = 0; i < keyCnt * 10; i++) {
- ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(),
- ThreadLocalRandom.current().nextInt()));
+ while (true) {
+ long tensOfSecondsPassed = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startNanos) / 10;
+ if (intermittentPutsMode && tensOfSecondsPassed % 2 == 1) {
+ System.out.println("... sleeping ...");
+ Thread.sleep(1000);
+ }
+ else {
+ ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(),
+ ThreadLocalRandom.current().nextInt()));
- putRate.increment();
+ putRate.increment();
+ putCount.incrementAndGet();
+ }
+
+ if (System.nanoTime() - startNanos > TimeUnit.MINUTES.toNanos(10))
+ break;
}
}
@@ -190,6 +225,11 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
}
}
+ /***/
+ private double globalPutsPerSec(AtomicLong putCount, long startNanos) {
+ return (double)putCount.get() * TimeUnit.SECONDS.toNanos(1) / (System.nanoTime() - startNanos);
+ }
+
/**
*
*/
@@ -247,4 +287,9 @@ public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
cleanPersistenceDir();
U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false));
}
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeOrHaltFailureHandler();
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index 4a20d1890a9..19e74c3f1b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.HitRateMetric;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
@@ -69,7 +69,7 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
.setCheckpointFrequency(20_000)
.setWriteThrottlingEnabled(true)
.setCheckpointThreads(1)
- .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, 5_000_000));
+ .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, 5_000_000));
cfg.setDataStorageConfiguration(dbCfg);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java
new file mode 100644
index 00000000000..6439f5e4ec7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/SpeedBasedThrottleIntegrationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
+/**
+ * Integration tests for {@link PagesWriteSpeedBasedThrottle}.
+ */
+public class SpeedBasedThrottleIntegrationTest extends GridCommonAbstractTest {
+ /***/
+ private final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ DataStorageConfiguration dbCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ // set small region size to make it easy achieve the necessity to throttle with speed-based throttle
+ .setMaxSize(60 * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ )
+ .setCheckpointFrequency(200)
+ .setWriteThrottlingEnabled(true)
+ .setFileIOFactory(
+ new SlowCheckpointMetadataFileIOFactory(
+ new AtomicBoolean(true), TimeUnit.MILLISECONDS.toNanos(10000)
+ )
+ );
+
+ return cfg.setDataStorageConfiguration(dbCfg)
+ .setConsistentId(gridName)
+ .setGridLogger(listeningLog);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 60 * 1000;
+ }
+
+ /**
+ */
+ @Test
+ public void speedBasedThrottleShouldBeActivatedWhenNeeded() throws Exception {
+ AtomicBoolean throttled = new AtomicBoolean(false);
+ listeningLog.registerListener(message -> {
+ if (message.startsWith("Throttling is applied to page modifications")) {
+ throttled.set(true);
+ }
+ });
+
+ Ignite ignite = startGrids(1);
+
+ ignite.cluster().state(ACTIVE);
+ IgniteCache<Object, Object> cache = ignite.createCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 1_000_000; i++) {
+ cache.put("key" + i, ThreadLocalRandom.current().nextDouble());
+
+ if (throttled.get()) {
+ break;
+ }
+ }
+
+ assertTrue("Throttling was not triggered", throttled.get());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java
index 386738d7965..62ab1550735 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/CheckpointTest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.db.SlowCheckpointMetadataFileIOFactory;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -64,7 +64,7 @@ public class CheckpointTest extends AbstractPerformanceStatisticsTest {
.setMetricsEnabled(true)
.setPersistenceEnabled(true))
.setWriteThrottlingEnabled(true)
- .setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, 500_000))
+ .setFileIOFactory(new SlowCheckpointMetadataFileIOFactory(slowCheckpointEnabled, 500_000))
.setCheckpointThreads(1));
return cfg;
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
index aeb1865904c..536c7a782f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryNoStoreLeakTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.SpeedBasedThrottleBreakdownTest;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.SpeedBasedThrottleIntegrationTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTestPersistence;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIOTest;
@@ -90,6 +91,7 @@ public class IgnitePdsTestSuite5 {
// Write throttling
GridTestUtils.addTestIfNeeded(suite, PagesWriteThrottleSmokeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, SpeedBasedThrottleBreakdownTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, SpeedBasedThrottleIntegrationTest.class, ignoredTests);
// Discovery data handling on node join and old cluster abnormal shutdown
GridTestUtils.addTestIfNeeded(suite, IgnitePdsDiscoDataHandlingInNewClusterTest.class, ignoredTests);