You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/02/22 07:22:24 UTC
[ignite] branch master updated: IGNITE-16600 Implement tryWakeupThrottledThreads and shouldThrottle for speed based throttle (#9840)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5f52f01 IGNITE-16600 Implement tryWakeupThrottledThreads and shouldThrottle for speed based throttle (#9840)
5f52f01 is described below
commit 5f52f010b415fce14d9c1e74365d77b6713e73f2
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Feb 22 11:20:44 2022 +0400
IGNITE-16600 Implement tryWakeupThrottledThreads and shouldThrottle for speed based throttle (#9840)
Co-authored-by: Anton Kalashnikov <ka...@yandex.ru>
---
.../pagemem/PagesWriteSpeedBasedThrottle.java | 30 ++++++-
.../persistence/pagemem/PagesWriteThrottle.java | 3 -
.../pagemem/PagesWriteThrottlePolicy.java | 3 +
.../pagemem/IgniteThrottlingUnitTest.java | 92 ++++++++++++++++++----
.../persistence/pagemem/PageMemoryImplTest.java | 21 +++--
5 files changed, 123 insertions(+), 26 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
index a0ae35f..afa7cfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -85,6 +85,9 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
/** Threads set. Contains identifiers of all threads which were marking pages for current checkpoint. */
private final GridConcurrentHashSet<Long> threadIds = new GridConcurrentHashSet<>();
+ /** Threads set. Contains threads which is currently parked because of throttling. */
+ private final GridConcurrentHashSet<Thread> parkedThreads = new GridConcurrentHashSet<>();
+
/**
* Used for calculating speed of marking pages dirty.
* Value from past 750-1000 millis only.
@@ -244,7 +247,14 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
+ " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
}
- LockSupport.parkNanos(throttleParkTimeNs);
+ parkedThreads.add(Thread.currentThread());
+
+ try {
+ LockSupport.parkNanos(throttleParkTimeNs);
+ }
+ finally {
+ parkedThreads.remove(Thread.currentThread());
+ }
}
/**
@@ -469,6 +479,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
speedCpWrite.finishInterval();
speedMarkAndAvgParkTime.finishInterval();
threadIds.clear();
+ parkedThreads.forEach(LockSupport::unpark);
}
/**
@@ -520,6 +531,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
/**
* Measurement shows how much throttling time is involved into average marking time.
+ *
* @return metric started from 0.0 and showing how much throttling is involved into current marking process.
*/
public double throttleWeight() {
@@ -536,6 +548,22 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
return 1.0 * throttleParkTime() / timeForOnePage;
}
+ /** {@inheritDoc} */
+ @Override public void tryWakeupThrottledThreads() {
+ if (!shouldThrottle()) {
+ exponentialBackoffCntr.set(0);
+
+ parkedThreads.forEach(LockSupport::unpark);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean shouldThrottle() {
+ int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD);
+
+ return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
+ }
+
/**
* Throttling mode for page.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index ef2fce4..6d62234 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -48,9 +48,6 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
/** Backoff ratio. Each next park will be this times longer. */
private static final double BACKOFF_RATIO = 1.05;
- /** Checkpoint buffer fullfill upper bound. */
- private static final float CP_BUF_FILL_THRESHOLD = 2f / 3;
-
/** Counter for dirty pages ratio throttling. */
private final AtomicInteger notInCheckpointBackoffCntr = new AtomicInteger(0);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
index d4ca3c8..406cab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
@@ -33,6 +33,9 @@ public interface PagesWriteThrottlePolicy {
public long LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(
IgniteSystemProperties.getInteger(IGNITE_THROTTLE_LOG_THRESHOLD, DFLT_THROTTLE_LOG_THRESHOLD));
+ /** Checkpoint buffer fullfill upper bound. */
+ static final float CP_BUF_FILL_THRESHOLD = 2f / 3;
+
/**
* Callback to apply throttling delay.
* @param isPageInCheckpoint flag indicating if current page is in scope of current checkpoint.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 8d2c613..d1e9256 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -38,17 +38,15 @@ import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
-import org.mockito.Mockito;
import static java.lang.Thread.State.TIMED_WAITING;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -57,7 +55,7 @@ import static org.mockito.Mockito.when;
/**
*
*/
-public class IgniteThrottlingUnitTest {
+public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
/** Per test timeout */
@Rule
public Timeout globalTimeout = new Timeout((int)GridTestUtils.DFLT_TEST_TIMEOUT);
@@ -121,8 +119,8 @@ public class IgniteThrottlingUnitTest {
}
/**
- * Test that time to park is calculated according to both cpSpeed and mark dirty speed (in case if
- * checkpoint buffer is not full).
+ * Test that time to park is calculated according to both cpSpeed and mark dirty speed (in case if checkpoint buffer
+ * is not full).
*/
@Test
public void testCorrectTimeToPark() {
@@ -131,11 +129,11 @@ public class IgniteThrottlingUnitTest {
int markDirtySpeed = 34422;
int cpWriteSpeed = 19416;
long time = throttle.getParkTime(0.04,
- ((903150 + 227217) / 2),
- 903150,
- 1,
- markDirtySpeed,
- cpWriteSpeed);
+ ((903150 + 227217) / 2),
+ 903150,
+ 1,
+ markDirtySpeed,
+ cpWriteSpeed);
long mdSpeed = TimeUnit.SECONDS.toNanos(1) / markDirtySpeed;
long cpSpeed = TimeUnit.SECONDS.toNanos(1) / cpWriteSpeed;
@@ -278,7 +276,69 @@ public class IgniteThrottlingUnitTest {
assertTrue(time == 0);
}
- /** */
+ /**
+ * @throws IgniteInterruptedCheckedException if fail.
+ */
+ @Test
+ public void wakeupSpeedBaseThrottledThreadOnCheckpointFinish() throws IgniteInterruptedCheckedException {
+ //given: Enabled throttling with EXPONENTIAL level.
+ CheckpointProgressImpl cl0 = mock(CheckpointProgressImpl.class);
+ when(cl0.writtenPagesCounter()).thenReturn(new AtomicInteger(200));
+
+ IgniteOutClosure<CheckpointProgress> cpProgress = mock(IgniteOutClosure.class);
+ when(cpProgress.apply()).thenReturn(cl0);
+
+ PagesWriteThrottlePolicy plc = new PagesWriteSpeedBasedThrottle(pageMemory2g, cpProgress, stateChecker, log) {
+ @Override protected void doPark(long throttleParkTimeNs) {
+ //Force parking to long time.
+ super.doPark(TimeUnit.SECONDS.toNanos(1));
+ }
+ };
+
+ when(pageMemory2g.checkpointBufferPagesSize()).thenReturn(100);
+ when(pageMemory2g.checkpointBufferPagesCount()).thenAnswer(mock -> 70);
+
+ AtomicBoolean stopLoad = new AtomicBoolean();
+ List<Thread> loadThreads = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ loadThreads.add(new Thread(
+ () -> {
+ while (!stopLoad.get())
+ plc.onMarkDirty(true);
+ },
+ "load-" + i
+ ));
+ }
+
+ try {
+ loadThreads.forEach(Thread::start);
+
+ //and: All load threads are parked.
+ for (Thread t : loadThreads)
+ assertTrue(t.getName(), waitForCondition(() -> t.getState() == TIMED_WAITING, 1000L));
+
+ //when: Disable throttling
+ when(cpProgress.apply()).thenReturn(null);
+
+ //and: Finish the checkpoint.
+ plc.onFinishCheckpoint();
+
+ //then: All load threads should be unparked.
+ for (Thread t : loadThreads)
+ assertTrue(t.getName(), waitForCondition(() -> t.getState() != TIMED_WAITING, 500L));
+
+ for (Thread t : loadThreads)
+ assertNotEquals(t.getName(), TIMED_WAITING, t.getState());
+ }
+ finally {
+ stopLoad.set(true);
+ }
+ }
+
+ /**
+ *
+ */
@Test
public void wakeupThrottledThread() throws IgniteInterruptedCheckedException {
PagesWriteThrottlePolicy plc = new PagesWriteThrottle(pageMemory2g, null, stateChecker, true, log);
@@ -349,12 +409,12 @@ public class IgniteThrottlingUnitTest {
AtomicInteger written = new AtomicInteger();
- CheckpointProgressImpl cl0 = Mockito.mock(CheckpointProgressImpl.class);
+ CheckpointProgressImpl cl0 = mock(CheckpointProgressImpl.class);
- IgniteOutClosure<CheckpointProgress> cpProgress = Mockito.mock(IgniteOutClosure.class);
- Mockito.when(cpProgress.apply()).thenReturn(cl0);
+ IgniteOutClosure<CheckpointProgress> cpProgress = mock(IgniteOutClosure.class);
+ when(cpProgress.apply()).thenReturn(cl0);
- Mockito.when(cl0.writtenPagesCounter()).thenReturn(written);
+ when(cl0.writtenPagesCounter()).thenReturn(written);
PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, cpProgress, stateChecker, log) {
@Override protected void doPark(long throttleParkTimeNs) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index a9a5322..f2bc7e9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -278,16 +278,19 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
int pagesForStartThrottling = 10;
+ //Number of pages which were poll from checkpoint buffer for throttling.
+ AtomicInteger cpBufferPollPages = new AtomicInteger();
+
// Create a 1 mb page memory.
PageMemoryImpl memory = createPageMemory(
1,
plc,
pageStoreMgr,
pageStoreMgr,
- new IgniteInClosure<FullPageId>() {
- @Override public void apply(FullPageId fullPageId) {
- assertTrue(allocated.contains(fullPageId));
- }
+ (IgniteInClosure<FullPageId>)fullPageId -> {
+ //First increment then get because pageStoreMgr.storedPages always contains at least one page
+ // which was written before throttling.
+ assertEquals(cpBufferPollPages.incrementAndGet(), pageStoreMgr.storedPages.size());
}
);
@@ -305,10 +308,16 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
GridMultiCollectionWrapper<FullPageId> markedPages = memory.beginCheckpoint(new GridFinishedFuture());
- for (int i = 0; i < 10 + (memory.checkpointBufferPagesSize() * 2 / 3); i++)
+ for (int i = 0; i < pagesForStartThrottling + (memory.checkpointBufferPagesSize() * 2 / 3); i++)
writePage(memory, allocated.get(i), (byte)1);
doCheckpoint(markedPages, memory, pageStoreMgr);
+
+ //There is 'pagesForStartThrottling - 1' because we should write pagesForStartThrottling pages
+ // from checkpoint buffer before throttling will be disabled but at least one page always would be written
+ // outside of throttling and in our case we certainly know that this page is also contained in checkpoint buffer
+ // (because all of our pages are in checkpoint buffer).
+ assertEquals(pagesForStartThrottling - 1, cpBufferPollPages.get());
}
/**
@@ -689,7 +698,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
*/
private static class TestPageStoreManager extends NoOpPageStoreManager implements PageStoreWriter {
/** */
- private Map<FullPageId, byte[]> storedPages = new HashMap<>();
+ public Map<FullPageId, byte[]> storedPages = new HashMap<>();
/** {@inheritDoc} */
@Override public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {