You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/02/22 07:22:24 UTC

[ignite] branch master updated: IGNITE-16600 Implement tryWakeupThrottledThreads and shouldThrottle for speed based throttle (#9840)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f52f01  IGNITE-16600 Implement tryWakeupThrottledThreads and shouldThrottle for speed based throttle (#9840)
5f52f01 is described below

commit 5f52f010b415fce14d9c1e74365d77b6713e73f2
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Feb 22 11:20:44 2022 +0400

    IGNITE-16600 Implement tryWakeupThrottledThreads and shouldThrottle for speed based throttle (#9840)
    
    Co-authored-by: Anton Kalashnikov <ka...@yandex.ru>
---
 .../pagemem/PagesWriteSpeedBasedThrottle.java      | 30 ++++++-
 .../persistence/pagemem/PagesWriteThrottle.java    |  3 -
 .../pagemem/PagesWriteThrottlePolicy.java          |  3 +
 .../pagemem/IgniteThrottlingUnitTest.java          | 92 ++++++++++++++++++----
 .../persistence/pagemem/PageMemoryImplTest.java    | 21 +++--
 5 files changed, 123 insertions(+), 26 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
index a0ae35f..afa7cfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -85,6 +85,9 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
     /** Threads set. Contains identifiers of all threads which were marking pages for current checkpoint. */
     private final GridConcurrentHashSet<Long> threadIds = new GridConcurrentHashSet<>();
 
+    /** Threads set. Contains threads which is currently parked because of throttling. */
+    private final GridConcurrentHashSet<Thread> parkedThreads = new GridConcurrentHashSet<>();
+
     /**
      * Used for calculating speed of marking pages dirty.
      * Value from past 750-1000 millis only.
@@ -244,7 +247,14 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
                 + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
         }
 
-        LockSupport.parkNanos(throttleParkTimeNs);
+        parkedThreads.add(Thread.currentThread());
+
+        try {
+            LockSupport.parkNanos(throttleParkTimeNs);
+        }
+        finally {
+            parkedThreads.remove(Thread.currentThread());
+        }
     }
 
     /**
@@ -469,6 +479,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
         speedCpWrite.finishInterval();
         speedMarkAndAvgParkTime.finishInterval();
         threadIds.clear();
+        parkedThreads.forEach(LockSupport::unpark);
     }
 
     /**
@@ -520,6 +531,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
 
     /**
      * Measurement shows how much throttling time is involved into average marking time.
+     *
      * @return metric started from 0.0 and showing how much throttling is involved into current marking process.
      */
     public double throttleWeight() {
@@ -536,6 +548,22 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
         return 1.0 * throttleParkTime() / timeForOnePage;
     }
 
+    /** {@inheritDoc} */
+    @Override public void tryWakeupThrottledThreads() {
+        if (!shouldThrottle()) {
+            exponentialBackoffCntr.set(0);
+
+            parkedThreads.forEach(LockSupport::unpark);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean shouldThrottle() {
+        int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD);
+
+        return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
+    }
+
     /**
      * Throttling mode for page.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index ef2fce4..6d62234 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -48,9 +48,6 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
     /** Backoff ratio. Each next park will be this times longer. */
     private static final double BACKOFF_RATIO = 1.05;
 
-    /** Checkpoint buffer fullfill upper bound. */
-    private static final float CP_BUF_FILL_THRESHOLD = 2f / 3;
-
     /** Counter for dirty pages ratio throttling. */
     private final AtomicInteger notInCheckpointBackoffCntr = new AtomicInteger(0);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
index d4ca3c8..406cab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
@@ -33,6 +33,9 @@ public interface PagesWriteThrottlePolicy {
     public long LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(
         IgniteSystemProperties.getInteger(IGNITE_THROTTLE_LOG_THRESHOLD, DFLT_THROTTLE_LOG_THRESHOLD));
 
+    /** Checkpoint buffer fullfill upper bound. */
+    static final float CP_BUF_FILL_THRESHOLD = 2f / 3;
+
     /**
      * Callback to apply throttling delay.
      * @param isPageInCheckpoint flag indicating if current page is in scope of current checkpoint.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 8d2c613..d1e9256 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -38,17 +38,15 @@ import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
-import org.mockito.Mockito;
 
 import static java.lang.Thread.State.TIMED_WAITING;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -57,7 +55,7 @@ import static org.mockito.Mockito.when;
 /**
  *
  */
-public class IgniteThrottlingUnitTest {
+public class IgniteThrottlingUnitTest extends GridCommonAbstractTest {
     /** Per test timeout */
     @Rule
     public Timeout globalTimeout = new Timeout((int)GridTestUtils.DFLT_TEST_TIMEOUT);
@@ -121,8 +119,8 @@ public class IgniteThrottlingUnitTest {
     }
 
     /**
-     * Test that time to park is calculated according to both cpSpeed and mark dirty speed (in case if
-     * checkpoint buffer is not full).
+     * Test that time to park is calculated according to both cpSpeed and mark dirty speed (in case if checkpoint buffer
+     * is not full).
      */
     @Test
     public void testCorrectTimeToPark() {
@@ -131,11 +129,11 @@ public class IgniteThrottlingUnitTest {
         int markDirtySpeed = 34422;
         int cpWriteSpeed = 19416;
         long time = throttle.getParkTime(0.04,
-                ((903150 + 227217) / 2),
-                903150,
-                1,
-                markDirtySpeed,
-                cpWriteSpeed);
+            ((903150 + 227217) / 2),
+            903150,
+            1,
+            markDirtySpeed,
+            cpWriteSpeed);
 
         long mdSpeed = TimeUnit.SECONDS.toNanos(1) / markDirtySpeed;
         long cpSpeed = TimeUnit.SECONDS.toNanos(1) / cpWriteSpeed;
@@ -278,7 +276,69 @@ public class IgniteThrottlingUnitTest {
         assertTrue(time == 0);
     }
 
-    /** */
+    /**
+     * @throws IgniteInterruptedCheckedException if fail.
+     */
+    @Test
+    public void wakeupSpeedBaseThrottledThreadOnCheckpointFinish() throws IgniteInterruptedCheckedException {
+        //given: Enabled throttling with EXPONENTIAL level.
+        CheckpointProgressImpl cl0 = mock(CheckpointProgressImpl.class);
+        when(cl0.writtenPagesCounter()).thenReturn(new AtomicInteger(200));
+
+        IgniteOutClosure<CheckpointProgress> cpProgress = mock(IgniteOutClosure.class);
+        when(cpProgress.apply()).thenReturn(cl0);
+
+        PagesWriteThrottlePolicy plc = new PagesWriteSpeedBasedThrottle(pageMemory2g, cpProgress, stateChecker, log) {
+            @Override protected void doPark(long throttleParkTimeNs) {
+                //Force parking to long time.
+                super.doPark(TimeUnit.SECONDS.toNanos(1));
+            }
+        };
+
+        when(pageMemory2g.checkpointBufferPagesSize()).thenReturn(100);
+        when(pageMemory2g.checkpointBufferPagesCount()).thenAnswer(mock -> 70);
+
+        AtomicBoolean stopLoad = new AtomicBoolean();
+        List<Thread> loadThreads = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            loadThreads.add(new Thread(
+                () -> {
+                    while (!stopLoad.get())
+                        plc.onMarkDirty(true);
+                },
+                "load-" + i
+            ));
+        }
+
+        try {
+            loadThreads.forEach(Thread::start);
+
+            //and: All load threads are parked.
+            for (Thread t : loadThreads)
+                assertTrue(t.getName(), waitForCondition(() -> t.getState() == TIMED_WAITING, 1000L));
+
+            //when: Disable throttling
+            when(cpProgress.apply()).thenReturn(null);
+
+            //and: Finish the checkpoint.
+            plc.onFinishCheckpoint();
+
+            //then: All load threads should be unparked.
+            for (Thread t : loadThreads)
+                assertTrue(t.getName(), waitForCondition(() -> t.getState() != TIMED_WAITING, 500L));
+
+            for (Thread t : loadThreads)
+                assertNotEquals(t.getName(), TIMED_WAITING, t.getState());
+        }
+        finally {
+            stopLoad.set(true);
+        }
+    }
+
+    /**
+     *
+     */
     @Test
     public void wakeupThrottledThread() throws IgniteInterruptedCheckedException {
         PagesWriteThrottlePolicy plc = new PagesWriteThrottle(pageMemory2g, null, stateChecker, true, log);
@@ -349,12 +409,12 @@ public class IgniteThrottlingUnitTest {
 
         AtomicInteger written = new AtomicInteger();
 
-        CheckpointProgressImpl cl0 = Mockito.mock(CheckpointProgressImpl.class);
+        CheckpointProgressImpl cl0 = mock(CheckpointProgressImpl.class);
 
-        IgniteOutClosure<CheckpointProgress> cpProgress = Mockito.mock(IgniteOutClosure.class);
-        Mockito.when(cpProgress.apply()).thenReturn(cl0);
+        IgniteOutClosure<CheckpointProgress> cpProgress = mock(IgniteOutClosure.class);
+        when(cpProgress.apply()).thenReturn(cl0);
 
-        Mockito.when(cl0.writtenPagesCounter()).thenReturn(written);
+        when(cl0.writtenPagesCounter()).thenReturn(written);
 
         PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, cpProgress, stateChecker, log) {
             @Override protected void doPark(long throttleParkTimeNs) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index a9a5322..f2bc7e9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -278,16 +278,19 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
 
         int pagesForStartThrottling = 10;
 
+        //Number of pages which were poll from checkpoint buffer for throttling.
+        AtomicInteger cpBufferPollPages = new AtomicInteger();
+
         // Create a 1 mb page memory.
         PageMemoryImpl memory = createPageMemory(
             1,
             plc,
             pageStoreMgr,
             pageStoreMgr,
-            new IgniteInClosure<FullPageId>() {
-                @Override public void apply(FullPageId fullPageId) {
-                    assertTrue(allocated.contains(fullPageId));
-                }
+            (IgniteInClosure<FullPageId>)fullPageId -> {
+                //First increment then get because pageStoreMgr.storedPages always contains at least one page
+                // which was written before throttling.
+                assertEquals(cpBufferPollPages.incrementAndGet(), pageStoreMgr.storedPages.size());
             }
         );
 
@@ -305,10 +308,16 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
 
         GridMultiCollectionWrapper<FullPageId> markedPages = memory.beginCheckpoint(new GridFinishedFuture());
 
-        for (int i = 0; i < 10 + (memory.checkpointBufferPagesSize() * 2 / 3); i++)
+        for (int i = 0; i < pagesForStartThrottling + (memory.checkpointBufferPagesSize() * 2 / 3); i++)
             writePage(memory, allocated.get(i), (byte)1);
 
         doCheckpoint(markedPages, memory, pageStoreMgr);
+
+        //There is 'pagesForStartThrottling - 1' because we should write pagesForStartThrottling pages
+        // from checkpoint buffer before throttling will be disabled but at least one page always would be written
+        // outside of throttling and in our case we certainly know that this page is also contained in checkpoint buffer
+        // (because all of our pages are in checkpoint buffer).
+        assertEquals(pagesForStartThrottling - 1, cpBufferPollPages.get());
     }
 
     /**
@@ -689,7 +698,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
      */
     private static class TestPageStoreManager extends NoOpPageStoreManager implements PageStoreWriter {
         /** */
-        private Map<FullPageId, byte[]> storedPages = new HashMap<>();
+        public Map<FullPageId, byte[]> storedPages = new HashMap<>();
 
         /** {@inheritDoc} */
         @Override public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {