You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/05/04 13:59:36 UTC
[ignite-3] branch main updated: IGNITE-16887 [Native Persistence 3.0] Porting a checkpoint and related code, part 1 (#791)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a3875f417 IGNITE-16887 [Native Persistence 3.0] Porting a checkpoint and related code, part 1 (#791)
a3875f417 is described below
commit a3875f417bb889badcfb63319a3f8bb1b95a395c
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed May 4 16:59:32 2022 +0300
IGNITE-16887 [Native Persistence 3.0] Porting a checkpoint and related code, part 1 (#791)
---
.../ignite/internal/util/CollectionUtils.java | 24 +-
.../apache/ignite/internal/util/IgniteUtils.java | 28 ++
.../ignite/internal/util/CollectionUtilsTest.java | 33 +-
.../ignite/internal/util/IgniteUtilsTest.java | 45 ++
modules/page-memory/pom.xml | 6 +
.../persistence/ItBplusTreePageMemoryImplTest.java | 2 +
.../ItBplusTreeReuseListPageMemoryImplTest.java | 2 +
...olicyFactory.java => PageMemoryDataRegion.java} | 27 +-
.../pagememory/mem/DirectMemoryProvider.java | 9 +-
.../pagememory/persistence/PageHeader.java | 2 +-
.../pagememory/persistence/PageMemoryImpl.java | 557 +++++++++++++++++++--
.../pagememory/persistence/PageStoreWriter.java | 40 ++
.../persistence/checkpoint/CheckpointPages.java | 91 ++++
.../CheckpointProgress.java} | 25 +-
.../CheckpointReadLockTimeoutException.java} | 26 +-
.../checkpoint/CheckpointReadWriteLock.java | 153 ++++++
.../CheckpointState.java} | 35 +-
.../checkpoint/CheckpointTimeoutLock.java | 233 +++++++++
.../Checkpointer.java} | 25 +-
.../ReentrantReadWriteLockWithTracking.java | 208 ++++++++
.../ClockPageReplacementFlags.java | 4 +-
.../ClockPageReplacementPolicy.java | 5 +-
.../ClockPageReplacementPolicyFactory.java | 6 +-
.../replacement/DelayedDirtyPageStoreWrite.java | 116 +++++
.../replacement/DelayedPageReplacementTracker.java | 224 +++++++++
.../{ => replacement}/PageReplacementPolicy.java | 2 +-
.../PageReplacementPolicyFactory.java | 2 +-
.../RandomLruPageReplacementPolicy.java | 15 +-
.../RandomLruPageReplacementPolicyFactory.java | 2 +-
.../{ => replacement}/SegmentedLruPageList.java | 2 +-
.../SegmentedLruPageReplacementPolicy.java | 3 +-
.../SegmentedLruPageReplacementPolicyFactory.java | 6 +-
.../pagememory/impl/PageMemoryNoLoadSelfTest.java | 14 +-
.../persistence/PageMemoryImplNoLoadTest.java | 74 ++-
.../checkpoint/CheckpointPagesTest.java | 119 +++++
.../checkpoint/CheckpointReadWriteLockTest.java | 201 ++++++++
.../checkpoint/CheckpointTimeoutLockTest.java | 356 +++++++++++++
.../ReentrantReadWriteLockWithTrackingTest.java | 191 +++++++
...gion.java => AbstractPageMemoryDataRegion.java} | 5 +-
.../pagememory/PageMemoryPartitionStorage.java | 3 +-
.../storage/pagememory/PageMemoryTableStorage.java | 4 +-
.../pagememory/VolatilePageMemoryDataRegion.java | 4 +-
42 files changed, 2763 insertions(+), 166 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index f85f78c43..264f1cbe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -146,9 +146,6 @@ public final class CollectionUtils {
}
return new AbstractCollection<>() {
- /** Total size of the collections. */
- int size = -1;
-
/** {@inheritDoc} */
@Override
public Iterator<T> iterator() {
@@ -158,18 +155,27 @@ public final class CollectionUtils {
/** {@inheritDoc} */
@Override
public int size() {
- if (size == -1) {
- int s = 0;
+ int size = 0;
- for (Collection<T> collection : collections) {
- s += collection.size();
- }
+ for (int i = 0; i < collections.length; i++) {
+ size += collections[i].size();
- size = s;
}
return size;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean contains(Object o) {
+ for (int i = 0; i < collections.length; i++) {
+ if (collections[i].contains(o)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
};
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b8eee8e07..30e0495b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -34,8 +34,11 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -690,4 +693,29 @@ public class IgniteUtils {
public static boolean isPow2(int i) {
return i > 0 && (i & (i - 1)) == 0;
}
+
+ /**
+ * Waits if necessary for this future to complete, and then returns its result ignoring interrupts.
+ *
+ * @return Result value.
+ * @throws CancellationException If this future was cancelled.
+ * @throws ExecutionException If this future completed exceptionally.
+ */
+ public static <T> T getUninterruptibly(CompletableFuture<T> future) throws ExecutionException {
+ boolean interrupted = false;
+
+ try {
+ while (true) {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
index a0669a78d..39878bf72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
@@ -26,10 +26,12 @@ import static org.apache.ignite.internal.util.CollectionUtils.setOf;
import static org.apache.ignite.internal.util.CollectionUtils.union;
import static org.apache.ignite.internal.util.CollectionUtils.viewReadOnly;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -114,13 +116,42 @@ public class CollectionUtilsTest {
@Test
void testCollectionUnion() {
assertTrue(union().isEmpty());
- assertTrue(union().isEmpty());
+ assertTrue(union((Collection<Object>[]) null).isEmpty());
assertTrue(union(List.of()).isEmpty());
assertEquals(List.of(1), collect(union(List.of(1), List.of())));
assertEquals(List.of(1), collect(union(List.of(), List.of(1))));
assertEquals(List.of(1, 2), collect(union(List.of(1), List.of(2))));
+ assertEquals(List.of(1, 2, 2), collect(union(List.of(1), List.of(2), List.of(2))));
+
+ assertFalse(union().contains(0));
+ assertFalse(union(List.of()).contains(0));
+ assertFalse(union(List.of(1)).contains(0));
+ assertFalse(union(List.of(1), List.of()).contains(0));
+ assertFalse(union(List.of(), List.of(1)).contains(0));
+ assertFalse(union(List.of(1), List.of(2, 3)).contains(0));
+
+ assertTrue(union(List.of(0)).contains(0));
+ assertTrue(union(List.of(), List.of(0)).contains(0));
+ assertTrue(union(List.of(0), List.of()).contains(0));
+
+ assertEquals(0, union().size());
+ assertEquals(0, union(List.of()).size());
+ assertEquals(1, union(List.of(1)).size());
+ assertEquals(1, union(List.of(), List.of(1)).size());
+ assertEquals(1, union(List.of(1), List.of()).size());
+ assertEquals(2, union(List.of(1), List.of(2)).size());
+ assertEquals(3, union(List.of(1), List.of(2, 3)).size());
+ assertEquals(5, union(List.of(1, 4, 5), List.of(2, 3)).size());
+
+ List<Integer> integers = new ArrayList<>(List.of(1, 2, 3));
+
+ Collection<Integer> union = union(integers);
+
+ integers.remove(0);
+
+ assertEquals(2, union.size());
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 96618ee8a..fcadfccf8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -17,15 +17,25 @@
package org.apache.ignite.internal.util;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
/**
@@ -79,4 +89,39 @@ class IgniteUtilsTest {
assertFalse(isPow2(7));
assertFalse(isPow2(9));
}
+
+ @Test
+ void testGetUninterruptibly() throws Exception {
+ assertThat(getUninterruptibly(completedFuture(true)), equalTo(true));
+ assertThat(Thread.currentThread().isInterrupted(), equalTo(false));
+
+ ExecutionException exception0 = assertThrows(
+ ExecutionException.class,
+ () -> getUninterruptibly(failedFuture(new Exception("test")))
+ );
+
+ assertThat(exception0.getCause(), instanceOf(Exception.class));
+ assertThat(exception0.getCause().getMessage(), equalTo("test"));
+ assertThat(Thread.currentThread().isInterrupted(), equalTo(false));
+
+ CompletableFuture<?> canceledFuture = new CompletableFuture<>();
+ canceledFuture.cancel(false);
+
+ assertThrows(CancellationException.class, () -> getUninterruptibly(canceledFuture));
+ assertThat(Thread.currentThread().isInterrupted(), equalTo(false));
+
+ // Checks interrupt.
+
+ runAsync(() -> {
+ try {
+ Thread.currentThread().interrupt();
+
+ getUninterruptibly(completedFuture(null));
+
+ assertThat(Thread.currentThread().isInterrupted(), equalTo(true));
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }).get(1, TimeUnit.SECONDS);
+ }
}
diff --git a/modules/page-memory/pom.xml b/modules/page-memory/pom.xml
index f69863181..598e62ee0 100644
--- a/modules/page-memory/pom.xml
+++ b/modules/page-memory/pom.xml
@@ -80,6 +80,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
index 4c6b688fb..f2170c290 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
@@ -53,6 +53,8 @@ public class ItBplusTreePageMemoryImplTest extends ItBplusTreeSelfTest {
sizes,
new TestPageReadWriteManager(),
(page, fullPageId, pageMemoryEx) -> {
+ },
+ (fullPageId, buf, tag) -> {
}
);
}
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
index fe29150d0..b68c1b834 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
@@ -52,6 +52,8 @@ public class ItBplusTreeReuseListPageMemoryImplTest extends ItBplusTreeReuseSelf
sizes,
new TestPageReadWriteManager(),
(page, fullPageId, pageMemoryEx) -> {
+ },
+ (fullPageId, buf, tag) -> {
}
);
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
similarity index 59%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
index 0eeb7dcc1..dbc6e3c2f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
@@ -15,21 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory;
+
+import org.jetbrains.annotations.Nullable;
/**
- * {@link ClockPageReplacementPolicy} factory.
+ * Data region based on {@link PageMemory}.
*/
-public class ClockPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
- /** {@inheritDoc} */
- @Override
- public long requiredMemory(int pagesCnt) {
- return ClockPageReplacementFlags.requiredMemory(pagesCnt);
- }
+public interface PageMemoryDataRegion {
+ /**
+ * Returns {@link true} if the date region is persistent.
+ */
+ boolean persistent();
- /** {@inheritDoc} */
- @Override
- public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
- return new ClockPageReplacementPolicy(seg, ptr, pagesCnt);
- }
+ /**
+ * Returns page memory, {@code null} if not started.
+ */
+ @Nullable
+ PageMemory pageMemory();
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
index f8dfcdcb0..b7534981e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.pagememory.mem;
+import org.jetbrains.annotations.Nullable;
+
/**
* Direct memory provider interface. Not thread-safe.
*/
@@ -26,19 +28,20 @@ public interface DirectMemoryProvider {
*
* @param chunkSizes Chunk sizes.
*/
- public void initialize(long[] chunkSizes);
+ void initialize(long[] chunkSizes);
/**
* Shuts down the provider.
*
* @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse.
*/
- public void shutdown(boolean deallocate);
+ void shutdown(boolean deallocate);
/**
* Attempts to allocate next memory region. Will return {@code null} if no more regions are available.
*
* @return Next memory region.
*/
- public DirectMemoryRegion nextRegion();
+ @Nullable
+ DirectMemoryRegion nextRegion();
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
index d776e21ea..dddf0eb51 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.pagememory.FullPageId;
/**
* Page header.
*/
-class PageHeader {
+public class PageHeader {
/** Page marker. */
public static final long PAGE_MARKER = 0x0000000000000001L;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
index eb05a3617..cd3c6d2ba 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.pagememory.persistence;
import static java.lang.System.lineSeparator;
+import static org.apache.ignite.internal.pagememory.FullPageId.NULL_PAGE;
import static org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
import static org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
import static org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
@@ -29,8 +30,10 @@ import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.readPageId;
import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.persistence.PagePool.SEGMENT_INDEX_MASK;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
@@ -43,25 +46,30 @@ import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.GridUnsafe.zeroMemory;
import static org.apache.ignite.internal.util.IgniteUtils.hash;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
@@ -75,7 +83,14 @@ import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
-import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
+import org.apache.ignite.internal.pagememory.persistence.replacement.ClockPageReplacementPolicyFactory;
+import org.apache.ignite.internal.pagememory.persistence.replacement.DelayedPageReplacementTracker;
+import org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicy;
+import org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicyFactory;
+import org.apache.ignite.internal.pagememory.persistence.replacement.RandomLruPageReplacementPolicyFactory;
+import org.apache.ignite.internal.pagememory.persistence.replacement.SegmentedLruPageReplacementPolicyFactory;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.OffheapReadWriteLock;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
@@ -109,6 +124,13 @@ import org.jetbrains.annotations.TestOnly;
*/
@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
public class PageMemoryImpl implements PageMemoryEx {
+ /**
+ * When set to {@code true} (default), pages are written to page store without holding segment lock (with delay).
+ * Because other thread may require exactly the same page to be loaded from store, reads are protected by locking.
+ */
+ // TODO: IGNITE-16350 Move to config or something else.
+ public static final String IGNITE_DELAYED_REPLACED_PAGE_WRITE = "IGNITE_DELAYED_REPLACED_PAGE_WRITE";
+
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
@@ -116,10 +138,10 @@ public class PageMemoryImpl implements PageMemoryEx {
public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
/** Invalid relative pointer value. */
- static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+ public static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
/** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
- static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+ public static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
/** Page lock offset. */
public static final int PAGE_LOCK_OFFSET = 32;
@@ -148,7 +170,8 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Direct memory allocator. */
private final DirectMemoryProvider directMemoryProvider;
- /** Segments array. */
+ /** Segments array, {@code null} if not {@link #start() started}. */
+ @Nullable
private volatile Segment[] segments;
/** Lock for segments changes. */
@@ -168,21 +191,39 @@ public class PageMemoryImpl implements PageMemoryEx {
/** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
private volatile int pageReplacementWarned;
- /** Segments sizes. */
+ /** Segments sizes, the last one being the {@link #checkpointPool checkpoint buffer} size. */
private final long[] sizes;
/** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
private volatile boolean started;
+ /** See {@link #safeToUpdate()}. */
+ private final AtomicBoolean safeToUpdate = new AtomicBoolean(true);
+
+ /** Checkpoint page pool, {@code null} if not {@link #start() started}. */
+ @Nullable
+ private volatile PagePool checkpointPool;
+
+ /**
+ * Delayed page replacement (rotation with disk) tracker. Because other thread may require exactly the same page to be loaded from
+ * store, reads are protected by locking, {@code null} if delayed write functionality is disabled.
+ */
+ @Nullable
+ private final DelayedPageReplacementTracker delayedPageReplacementTracker;
+
+ /** Flush dirty page closure. */
+ private final PageStoreWriter flushDirtyPage;
+
/**
* Constructor.
*
* @param directMemoryProvider Memory allocator to use.
* @param dataRegionCfg Data region configuration.
* @param ioRegistry IO registry.
- * @param sizes Segments sizes.
+ * @param sizes Segments sizes, the last one being the checkpoint buffer size.
* @param pmPageMgr Page store manager.
* @param changeTracker Callback invoked to track changes in pages.
+ * @param flushDirtyPage Write callback invoked when a dirty page is removed for replacement.
*/
public PageMemoryImpl(
DirectMemoryProvider directMemoryProvider,
@@ -190,7 +231,8 @@ public class PageMemoryImpl implements PageMemoryEx {
PageIoRegistry ioRegistry,
long[] sizes,
PageReadWriteManager pmPageMgr,
- @Nullable PageChangeTracker changeTracker
+ @Nullable PageChangeTracker changeTracker,
+ PageStoreWriter flushDirtyPage
) {
this.directMemoryProvider = directMemoryProvider;
this.dataRegionCfg = dataRegionCfg.value();
@@ -198,6 +240,7 @@ public class PageMemoryImpl implements PageMemoryEx {
this.sizes = sizes;
this.pmPageMgr = pmPageMgr;
this.changeTracker = changeTracker;
+ this.flushDirtyPage = flushDirtyPage;
int pageSize = this.dataRegionCfg.pageSize();
@@ -223,6 +266,9 @@ public class PageMemoryImpl implements PageMemoryEx {
default:
throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
}
+
+ delayedPageReplacementTracker = getBoolean(IGNITE_DELAYED_REPLACED_PAGE_WRITE, true)
+ ? new DelayedPageReplacementTracker(pageSize, flushDirtyPage, LOG, sizes.length - 1) : null;
}
/** {@inheritDoc} */
@@ -255,6 +301,8 @@ public class PageMemoryImpl implements PageMemoryEx {
DirectMemoryRegion cpReg = regions.get(regs - 1);
+ checkpointPool = new PagePool(regs - 1, cpReg, sysPageSize, rwLock);
+
long checkpointBuf = cpReg.size();
long totalAllocated = 0;
@@ -542,8 +590,6 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override
public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
- assert started;
-
return acquirePage(grpId, pageId, statHolder, false);
}
@@ -750,23 +796,6 @@ public class PageMemoryImpl implements PageMemoryEx {
return pageSize();
}
- /**
- * Returns Max dirty ratio from the segments.
- */
- double getDirtyPagesRatio() {
- if (segments == null) {
- return 0;
- }
-
- double res = 0;
-
- for (Segment segment : segments) {
- res = Math.max(res, segment.getDirtyPagesRatio());
- }
-
- return res;
- }
-
/**
* Returns total pages can be placed in all segments.
*/
@@ -1014,6 +1043,44 @@ public class PageMemoryImpl implements PageMemoryEx {
private long postWriteLockPage(long absPtr, FullPageId fullId) {
writeTimestamp(absPtr, coarseCurrentTimeMillis());
+ // Create a buffer copy if the page is scheduled for a checkpoint.
+ if (isInCheckpoint(fullId) && tempBufferPointer(absPtr) == INVALID_REL_PTR) {
+ long tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId()));
+
+ if (tmpRelPtr == INVALID_REL_PTR) {
+ rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
+
+ throw new IgniteInternalException(
+ "Failed to allocate temporary buffer for checkpoint (increase checkpointPageBufferSize configuration property): "
+ + dataRegionCfg.name());
+ }
+
+ // Pin the page until checkpoint is not finished.
+ PageHeader.acquirePage(absPtr);
+
+ long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
+
+ copyMemory(
+ null,
+ absPtr + PAGE_OVERHEAD,
+ null,
+ tmpAbsPtr + PAGE_OVERHEAD,
+ pageSize()
+ );
+
+ assert getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(fullId.pageId());
+ assert getVersion(tmpAbsPtr + PAGE_OVERHEAD) != 0 :
+ "Invalid state. Version is 0! pageId = " + hexLong(fullId.pageId());
+
+ dirty(absPtr, false);
+ tempBufferPointer(absPtr, tmpRelPtr);
+ // info for checkpoint buffer cleaner.
+ fullPageId(tmpAbsPtr, fullId);
+
+ assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
+ assert getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
+ }
+
assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
return absPtr + PAGE_OVERHEAD;
@@ -1042,7 +1109,7 @@ public class PageMemoryImpl implements PageMemoryEx {
long pageId = getPageId(page + PAGE_OVERHEAD);
try {
- assert pageId != 0 : hexLong(PageHeader.readPageId(page));
+ assert pageId != 0 : hexLong(readPageId(page));
rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, tag(pageId));
@@ -1119,11 +1186,15 @@ public class PageMemoryImpl implements PageMemoryEx {
boolean wasDirty = dirty(absPtr, dirty);
if (dirty) {
+ // TODO: IGNITE-16898 Don't forget add assertion for checkpoint lock held by this thread
+
if (!wasDirty || forceAdd) {
Segment seg = segment(pageId.groupId(), pageId.pageId());
if (seg.dirtyPages.add(pageId)) {
- seg.dirtyPagesCntr.incrementAndGet();
+ if (seg.dirtyPagesCntr.incrementAndGet() >= seg.maxDirtyPages) {
+ safeToUpdate.set(false);
+ }
}
}
} else {
@@ -1141,7 +1212,14 @@ public class PageMemoryImpl implements PageMemoryEx {
return segments[idx];
}
- private static int segmentIndex(int grpId, long pageId, int segments) {
+ /**
+ * Returns segment index.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param segments Number of segments.
+ */
+ public static int segmentIndex(int grpId, long pageId, int segments) {
pageId = effectivePageId(pageId);
// Take a prime number larger than total number of partitions.
@@ -1154,12 +1232,12 @@ public class PageMemoryImpl implements PageMemoryEx {
* Returns a collection of all pages currently marked as dirty. Will create a collection copy.
*/
@TestOnly
- public Collection<FullPageId> dirtyPages() {
+ public Set<FullPageId> dirtyPages() {
if (segments == null) {
- return Collections.emptySet();
+ return Set.of();
}
- Collection<FullPageId> res = new HashSet<>((int) loadedPages());
+ Set<FullPageId> res = new HashSet<>((int) loadedPages());
for (Segment seg : segments) {
res.addAll(seg.dirtyPages);
@@ -1171,7 +1249,7 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* Page segment.
*/
- class Segment extends ReentrantReadWriteLock {
+ public class Segment extends ReentrantReadWriteLock {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -1200,11 +1278,15 @@ public class PageMemoryImpl implements PageMemoryEx {
private long memPerRepl;
/** Pages marked as dirty since the last checkpoint. */
- private volatile Collection<FullPageId> dirtyPages = ConcurrentHashMap.newKeySet();
+ private volatile Set<FullPageId> dirtyPages = ConcurrentHashMap.newKeySet();
/** Atomic size counter for {@link #dirtyPages}. */
private final AtomicLong dirtyPagesCntr = new AtomicLong();
+ /** Wrapper of pages of current checkpoint. */
+ @Nullable
+ private volatile CheckpointPages checkpointPages;
+
/** Maximum number of dirty pages. */
private final long maxDirtyPages;
@@ -1270,13 +1352,6 @@ public class PageMemoryImpl implements PageMemoryEx {
}
}
- /**
- * Returns dirtyRatio to be compared with Throttle threshold.
- */
- private double getDirtyPagesRatio() {
- return dirtyPagesCntr.doubleValue() / pages();
- }
-
/**
* Returns max number of pages this segment can allocate.
*/
@@ -1357,12 +1432,39 @@ public class PageMemoryImpl implements PageMemoryEx {
}
if (isDirty(absPtr)) {
- return false;
- }
+ CheckpointPages checkpointPages = this.checkpointPages;
+ // Can evict a dirty page only if should be written by a checkpoint.
+ // These pages does not have tmp buffer.
+ if (checkpointPages != null && checkpointPages.allowToSave(fullPageId)) {
+ assert pmPageMgr != null;
+
+ PageStoreWriter saveDirtyPage = delayedPageReplacementTracker != null
+ ? delayedPageReplacementTracker.delayedPageWrite() : flushDirtyPage;
- loadedPages.remove(fullPageId.groupId(), fullPageId.effectivePageId());
+ saveDirtyPage.writePage(
+ fullPageId,
+ wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()),
+ partGeneration(
+ fullPageId.groupId(),
+ partitionId(fullPageId.pageId())
+ )
+ );
+
+ setDirty(fullPageId, absPtr, false, true);
+
+ checkpointPages.markAsSaved(fullPageId);
+
+ loadedPages.remove(fullPageId.groupId(), fullPageId.effectivePageId());
+
+ return true;
+ }
+
+ return false;
+ } else {
+ loadedPages.remove(fullPageId.groupId(), fullPageId.effectivePageId());
- return true;
+ return true;
+ }
}
/**
@@ -1386,10 +1488,29 @@ public class PageMemoryImpl implements PageMemoryEx {
dirty(absPtr, false);
+ long tmpBufPtr = tempBufferPointer(absPtr);
+
+ if (tmpBufPtr != INVALID_REL_PTR) {
+ zeroMemory(checkpointPool.absolute(tmpBufPtr) + PAGE_OVERHEAD, pageSize());
+
+ tempBufferPointer(absPtr, INVALID_REL_PTR);
+
+ // We pinned the page when allocated the temp buffer, release it now.
+ PageHeader.releasePage(absPtr);
+
+ releaseCheckpointBufferPage(tmpBufPtr);
+ }
+
if (rmv) {
loadedPages.remove(grpId, effectivePageId(pageId));
}
+ CheckpointPages cpPages = checkpointPages;
+
+ if (cpPages != null) {
+ cpPages.markAsSaved(new FullPageId(pageId, grpId));
+ }
+
Collection<FullPageId> dirtyPages = this.dirtyPages;
if (dirtyPages != null) {
@@ -1549,9 +1670,16 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* Returns IO registry.
*/
- PageIoRegistry ioRegistry() {
+ public PageIoRegistry ioRegistry() {
return ioRegistry;
}
+
+ /**
+ * Gets checkpoint pages.
+ */
+ public CheckpointPages checkpointPages() {
+ return checkpointPages;
+ }
}
private static int updateAtomicInt(long ptr, int delta) {
@@ -1568,7 +1696,7 @@ public class PageMemoryImpl implements PageMemoryEx {
private static long updateAtomicLong(long ptr, long delta) {
while (true) {
- long old = GridUnsafe.getLong(ptr);
+ long old = getLong(ptr);
long updated = old + delta;
@@ -1598,4 +1726,337 @@ public class PageMemoryImpl implements PageMemoryEx {
*/
void apply(long page, FullPageId fullPageId, PageMemoryEx pageMemoryEx);
}
+
+ /**
+ * Heuristic method which allows a thread to check if it is safe to start memory structure modifications in regard with checkpointing.
+ * May return false-negative result during or after partition eviction.
+ *
+ * @return {@code False} if there are too many dirty pages and a thread should wait for a checkpoint to begin.
+ */
+ public boolean safeToUpdate() {
+ if (segments != null) {
+ return safeToUpdate.get();
+ }
+
+ return true;
+ }
+
+ /**
+ * Returns number of pages used in checkpoint buffer.
+ */
+ public int usedCheckpointBufferPages() {
+ PagePool checkpointPool = this.checkpointPool;
+
+ return checkpointPool == null ? 0 : checkpointPool.size();
+ }
+
+ /**
+ * Returns max number of pages in checkpoint buffer.
+ */
+ public int maxCheckpointBufferPages() {
+ PagePool checkpointPool = this.checkpointPool;
+
+ return checkpointPool == null ? 0 : checkpointPool.pages();
+ }
+
+ private void releaseCheckpointBufferPage(long tmpBufPtr) {
+ checkpointPool.releaseFreePage(tmpBufPtr);
+ }
+
+ /**
+ * Returns {@code true} if it was added to the checkpoint list.
+ *
+ * @param pageId Page ID to check if it was added to the checkpoint list.
+ */
+ boolean isInCheckpoint(FullPageId pageId) {
+ Segment seg = segment(pageId.groupId(), pageId.pageId());
+
+ CheckpointPages pages0 = seg.checkpointPages;
+
+ return pages0 != null && pages0.contains(pageId);
+ }
+
+ /**
+ * Returns {@code true} if remove successfully.
+ *
+ * @param fullPageId Page ID to clear.
+ */
+ boolean clearCheckpoint(FullPageId fullPageId) {
+ Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
+
+ CheckpointPages pages0 = seg.checkpointPages;
+
+ assert pages0 != null;
+
+ return pages0.markAsSaved(fullPageId);
+ }
+
+ /**
+ * Makes a full copy of the dirty page for checkpointing, then marks the page as not dirty.
+ *
+ * @param absPtr Absolute page pointer.
+ * @param fullId Full page id.
+ * @param buf Buffer for copy page content for future write via {@link PageStoreWriter}.
+ * @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be copied) in case
+ * checkpoint temporary buffer is used.
+ * @param pageStoreWriter Checkpoint page writer.
+ */
+ private void copyPageForCheckpoint(
+ long absPtr,
+ FullPageId fullId,
+ ByteBuffer buf,
+ int tag,
+ boolean pageSingleAcquire,
+ PageStoreWriter pageStoreWriter
+ ) throws IgniteInternalCheckedException {
+ assert absPtr != 0;
+ assert isAcquired(absPtr) || !isInCheckpoint(fullId);
+
+ // Exception protection flag.
+ // No need to write if exception occurred.
+ boolean canWrite = false;
+
+ boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+ if (!locked) {
+ // We release the page only once here because this page will be copied sometime later and
+ // will be released properly then.
+ if (!pageSingleAcquire) {
+ PageHeader.releasePage(absPtr);
+ }
+
+ buf.clear();
+
+ if (isInCheckpoint(fullId)) {
+ pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+ }
+
+ return;
+ }
+
+ if (!clearCheckpoint(fullId)) {
+ rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+ if (!pageSingleAcquire) {
+ PageHeader.releasePage(absPtr);
+ }
+
+ return;
+ }
+
+ try {
+ long tmpRelPtr = tempBufferPointer(absPtr);
+
+ if (tmpRelPtr != INVALID_REL_PTR) {
+ tempBufferPointer(absPtr, INVALID_REL_PTR);
+
+ long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
+
+ copyInBuffer(tmpAbsPtr, buf);
+
+ fullPageId(tmpAbsPtr, NULL_PAGE);
+
+ zeroMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize());
+
+ releaseCheckpointBufferPage(tmpRelPtr);
+
+ // Need release again because we pin page when resolve abs pointer,
+ // and page did not have tmp buffer page.
+ if (!pageSingleAcquire) {
+ PageHeader.releasePage(absPtr);
+ }
+ } else {
+ copyInBuffer(absPtr, buf);
+
+ dirty(absPtr, false);
+ }
+
+ assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(fullId.pageId());
+ assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(fullId.pageId());
+
+ canWrite = true;
+ } finally {
+ rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+ if (canWrite) {
+ buf.rewind();
+
+ pageStoreWriter.writePage(fullId, buf, tag);
+
+ buf.rewind();
+ }
+
+ // We pinned the page either when allocated the temp buffer, or when resolved abs pointer.
+ // Must release the page only after write unlock.
+ PageHeader.releasePage(absPtr);
+ }
+ }
+
+ /**
+ * Prepare page for write during checkpoint. {@link PageStoreWriter} will be called when the page will be ready to write.
+ *
+ * @param fullId Page ID to get byte buffer for. The page ID must be present in the collection returned by the {@link
+ * #beginCheckpoint(CompletableFuture)} method call.
+ * @param buf Temporary buffer to write changes into.
+ * @param pageStoreWriter Checkpoint page write context.
+ * @throws IgniteInternalCheckedException If failed to obtain page data.
+ */
+ public void checkpointWritePage(
+ FullPageId fullId,
+ ByteBuffer buf,
+ PageStoreWriter pageStoreWriter
+ ) throws IgniteInternalCheckedException {
+ assert buf.remaining() == pageSize();
+
+ Segment seg = segment(fullId.groupId(), fullId.pageId());
+
+ long absPtr = 0;
+
+ long relPtr;
+
+ int tag;
+
+ boolean pageSingleAcquire = false;
+
+ seg.readLock().lock();
+
+ try {
+ if (!isInCheckpoint(fullId)) {
+ return;
+ }
+
+ relPtr = resolveRelativePointer(seg, fullId, tag = generationTag(seg, fullId));
+
+ // Page may have been cleared during eviction. We have nothing to do in this case.
+ if (relPtr == INVALID_REL_PTR) {
+ return;
+ }
+
+ if (relPtr != OUTDATED_REL_PTR) {
+ absPtr = seg.absolute(relPtr);
+
+ // Pin the page until page will not be copied. This helpful to prevent page replacement of this page.
+ if (tempBufferPointer(absPtr) == INVALID_REL_PTR) {
+ PageHeader.acquirePage(absPtr);
+ } else {
+ pageSingleAcquire = true;
+ }
+ }
+ } finally {
+ seg.readLock().unlock();
+ }
+
+ if (relPtr == OUTDATED_REL_PTR) {
+ seg.writeLock().lock();
+
+ try {
+ // Double-check.
+ relPtr = resolveRelativePointer(seg, fullId, generationTag(seg, fullId));
+
+ if (relPtr == INVALID_REL_PTR) {
+ return;
+ }
+
+ if (relPtr == OUTDATED_REL_PTR) {
+ relPtr = seg.refreshOutdatedPage(
+ fullId.groupId(),
+ fullId.effectivePageId(),
+ true
+ );
+
+ seg.pageReplacementPolicy.onRemove(relPtr);
+
+ seg.pool.releaseFreePage(relPtr);
+ }
+
+ return;
+ } finally {
+ seg.writeLock().unlock();
+ }
+ }
+
+ copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter);
+ }
+
+ /**
+ * Get arbitrary page from cp buffer.
+ */
+ public FullPageId pullPageFromCpBuffer() {
+ long idx = getLong(checkpointPool.lastAllocatedIdxPtr);
+
+ long lastIdx = ThreadLocalRandom.current().nextLong(idx / 2, idx);
+
+ while (--lastIdx > 1) {
+ assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+ long relative = checkpointPool.relative(lastIdx);
+
+ long freePageAbsPtr = checkpointPool.absolute(relative);
+
+ FullPageId fullPageId = fullPageId(freePageAbsPtr);
+
+ if (fullPageId.pageId() == NULL_PAGE.pageId() || fullPageId.groupId() == NULL_PAGE.groupId()) {
+ continue;
+ }
+
+ if (!isInCheckpoint(fullPageId)) {
+ continue;
+ }
+
+ return fullPageId;
+ }
+
+ return NULL_PAGE;
+ }
+
+ /**
+ * Gets a collection of dirty page IDs since the last checkpoint. If a dirty page is being written after the checkpointing operation
+ * begun, the modifications will be written to a temporary buffer which will be flushed to the main memory after the checkpointing
+ * finished. This method must be called when no concurrent operations on pages are performed.
+ *
+ * @param allowToReplace The sign which allows replacing pages from a checkpoint by page replacer.
+ * @return Collection view of dirty page IDs.
+ * @throws IgniteInternalException If checkpoint has been already started and was not finished.
+ */
+ public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplace) throws IgniteInternalException {
+ if (segments == null) {
+ return List.of();
+ }
+
+ Collection<FullPageId>[] collections = new Collection[segments.length];
+
+ for (int i = 0; i < segments.length; i++) {
+ Segment seg = segments[i];
+
+ if (seg.checkpointPages != null) {
+ throw new IgniteInternalException("Failed to begin checkpoint (it is already in progress).");
+ }
+
+ Set<FullPageId> dirtyPages = seg.dirtyPages;
+ collections[i] = dirtyPages;
+
+ seg.checkpointPages = new CheckpointPages(dirtyPages, allowToReplace);
+
+ seg.resetDirtyPages();
+ }
+
+ safeToUpdate.set(true);
+
+ return CollectionUtils.union(collections);
+ }
+
+ /**
+ * Finishes checkpoint operation.
+ */
+ public void finishCheckpoint() {
+ if (segments == null) {
+ return;
+ }
+
+ synchronized (segmentsLock) {
+ for (Segment seg : segments) {
+ seg.checkpointPages = null;
+ }
+ }
+ }
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
new file mode 100644
index 000000000..99921e9dd
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Interface for write page to {@link PageStore}.
+ */
+// TODO: IGNITE-15818 Maybe refactor.
+public interface PageStoreWriter {
+ /**
+ * Callback for write page. {@link PageMemoryImpl} will copy page content to buffer before call.
+ *
+ * @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by the {@link
+ * PageMemoryImpl#beginCheckpoint(CompletableFuture)} method call.
+ * @param buf Temporary buffer to write changes into.
+ * @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
+ * @throws IgniteInternalCheckedException If write page failed.
+ */
+ void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteInternalCheckedException;
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
new file mode 100644
index 000000000..4c48a9770
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * View of pages which should be stored during current checkpoint.
+ */
+public class CheckpointPages {
+ private final Set<FullPageId> segCheckpointPages;
+
+ private final CompletableFuture<?> allowToReplace;
+
+ /**
+ * Constructor.
+ *
+ * @param pages Pages which would be stored to disk in current checkpoint, does not copy the set.
+ * @param replaceFuture The sign which allows replacing pages from a checkpoint by page replacer.
+ */
+ public CheckpointPages(Set<FullPageId> pages, CompletableFuture<?> replaceFuture) {
+ segCheckpointPages = pages;
+ allowToReplace = replaceFuture;
+ }
+
+ /**
+ * Returns {@code true} If fullPageId is allowable to store to disk.
+ *
+ * @param fullPageId Page id for checking.
+ * @throws IgniteInternalCheckedException If the waiting sign which allows replacing pages from a checkpoint by page replacer fails.
+ */
+ public boolean allowToSave(FullPageId fullPageId) throws IgniteInternalCheckedException {
+ try {
+ // Uninterruptibly is important because otherwise in case of interrupt of client thread node would be stopped.
+ getUninterruptibly(allowToReplace);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalCheckedException(e.getCause());
+ } catch (CancellationException e) {
+ throw new IgniteInternalCheckedException(e);
+ }
+
+ return segCheckpointPages.contains(fullPageId);
+ }
+
+ /**
+ * Returns {@code true} If fullPageId is candidate to stored to disk by current checkpoint.
+ *
+ * @param fullPageId Page id for checking.
+ */
+ public boolean contains(FullPageId fullPageId) {
+ return segCheckpointPages.contains(fullPageId);
+ }
+
+ /**
+ * Returns {@code true} if it is marking was successful.
+ *
+ * @param fullPageId Page id which should be marked as saved to disk.
+ */
+ public boolean markAsSaved(FullPageId fullPageId) {
+ return segCheckpointPages.remove(fullPageId);
+ }
+
+ /**
+ * Returns size of all pages in current checkpoint.
+ */
+ public int size() {
+ return segCheckpointPages.size();
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
similarity index 59%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
index 0eeb7dcc1..7c9d5eb89 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
@@ -15,21 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+
+import java.util.concurrent.CompletableFuture;
/**
- * {@link ClockPageReplacementPolicy} factory.
+ * Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
*/
-public class ClockPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
- /** {@inheritDoc} */
- @Override
- public long requiredMemory(int pagesCnt) {
- return ClockPageReplacementFlags.requiredMemory(pagesCnt);
- }
-
- /** {@inheritDoc} */
- @Override
- public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
- return new ClockPageReplacementPolicy(seg, ptr, pagesCnt);
- }
+// TODO: IGNITE-16898 Continue porting the code
+public interface CheckpointProgress {
+ /**
+ * Returns future which can be used for detection when current checkpoint reaches the specific state.
+ */
+ CompletableFuture<?> futureFor(CheckpointState state);
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadLockTimeoutException.java
similarity index 58%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadLockTimeoutException.java
index 651cc14df..300fd67a2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadLockTimeoutException.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.lang.IgniteInternalException;
/**
- * {@link RandomLruPageReplacementPolicy} factory.
+ * Indicates checkpoint read lock acquisition failure which did not lead to node invalidation.
*/
-public class RandomLruPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
- /** {@inheritDoc} */
- @Override
- public long requiredMemory(int pagesCnt) {
- return 0;
- }
+// TODO: IGNITE-16899 Change to inherit from IgniteInternalCheckedException
+public class CheckpointReadLockTimeoutException extends IgniteInternalException {
+ private static final long serialVersionUID = 0L;
- /** {@inheritDoc} */
- @Override
- public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
- return new RandomLruPageReplacementPolicy(seg);
+ /**
+ * Constructor.
+ *
+ * @param msg Error message.
+ */
+ CheckpointReadLockTimeoutException(String msg) {
+ super(msg);
}
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
new file mode 100644
index 000000000..339f9a28f
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Wrapper of the classic read write lock with checkpoint features.
+ */
+public class CheckpointReadWriteLock {
+ private final ThreadLocal<Integer> checkpointReadLockHoldCount = ThreadLocal.withInitial(() -> 0);
+
+ /**
+ * Any thread with a such prefix is managed by the checkpoint. So some conditions can rely on it(ex. we don't need a checkpoint lock
+ * there because checkpoint is already held write lock).
+ */
+ // TODO: IGNITE-16898 I think it needs to be redone or relocated
+ static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
+
+ /** Checkpoint lock. */
+ private final ReentrantReadWriteLockWithTracking checkpointLock;
+
+ /**
+ * Constructor.
+ *
+ * @param checkpointLock Checkpoint lock.
+ */
+ public CheckpointReadWriteLock(ReentrantReadWriteLockWithTracking checkpointLock) {
+ this.checkpointLock = checkpointLock;
+ }
+
+ /**
+ * Gets the checkpoint read lock.
+ *
+ * @throws IgniteInternalException If failed.
+ */
+ public void readLock() {
+ if (isWriteLockHeldByCurrentThread()) {
+ return;
+ }
+
+ checkpointLock.readLock().lock();
+
+ checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+ }
+
+ /**
+ * Tries to get a checkpoint read lock.
+ *
+ * @param timeout – Time to wait for the read lock.
+ * @param unit – Time unit of the timeout argument.
+ * @throws IgniteInternalException If failed.
+ */
+ public boolean tryReadLock(long timeout, TimeUnit unit) throws InterruptedException {
+ if (isWriteLockHeldByCurrentThread()) {
+ return true;
+ }
+
+ boolean res = checkpointLock.readLock().tryLock(timeout, unit);
+
+ if (res) {
+ checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+ }
+
+ return res;
+ }
+
+ /**
+ * Tries to get a checkpoint read lock.
+ *
+ * @return {@code True} if the checkpoint read lock is acquired.
+ */
+ public boolean tryReadLock() {
+ if (isWriteLockHeldByCurrentThread()) {
+ return true;
+ }
+
+ boolean res = checkpointLock.readLock().tryLock();
+
+ if (res) {
+ checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns {@code true} if checkpoint lock is held by current thread.
+ */
+ public boolean checkpointLockIsHeldByThread() {
+ return checkpointLock.isWriteLockedByCurrentThread()
+ || checkpointReadLockHoldCount.get() > 0
+ || Thread.currentThread().getName().startsWith(CHECKPOINT_RUNNER_THREAD_PREFIX);
+ }
+
+ /**
+ * Releases the checkpoint read lock.
+ */
+ public void readUnlock() {
+ if (isWriteLockHeldByCurrentThread()) {
+ return;
+ }
+
+ checkpointLock.readLock().unlock();
+
+ checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() - 1);
+ }
+
+ /**
+ * Takes the checkpoint write lock.
+ */
+ public void writeLock() {
+ checkpointLock.writeLock().lock();
+ }
+
+ /**
+ * Releases the checkpoint write lock.
+ */
+ public void writeUnlock() {
+ checkpointLock.writeLock().unlock();
+ }
+
+ /**
+ * Returns {@code true} if current thread hold write lock.
+ */
+ public boolean isWriteLockHeldByCurrentThread() {
+ return checkpointLock.isWriteLockedByCurrentThread();
+ }
+
+ /**
+ * Returns the number of reentrant read holds on this lock by the current thread. A reader thread has a hold on a lock for each lock
+ * action that is not matched by an unlock action.
+ */
+ public int getReadHoldCount() {
+ return checkpointLock.getReadHoldCount();
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
similarity index 54%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
index 651cc14df..b246e7736 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
@@ -15,23 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
-
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
/**
- * {@link RandomLruPageReplacementPolicy} factory.
+ * Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
*/
-public class RandomLruPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
- /** {@inheritDoc} */
- @Override
- public long requiredMemory(int pagesCnt) {
- return 0;
- }
+// TODO: IGNITE-16898 Review states
+public enum CheckpointState {
+ /** Checkpoint is waiting to execution. **/
+ SCHEDULED,
+
+ /** Checkpoint was awakened and it is preparing to start. **/
+ LOCK_TAKEN,
+
+ /** Dirty pages snapshot has been taken. **/
+ PAGE_SNAPSHOT_TAKEN,
+
+ /** Checkpoint counted the pages and write lock was released. **/
+ LOCK_RELEASED,
+
+ /** Checkpoint marker was stored to disk. **/
+ MARKER_STORED_TO_DISK,
- /** {@inheritDoc} */
- @Override
- public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
- return new RandomLruPageReplacementPolicy(seg);
- }
+ /** Checkpoint was finished. **/
+ FINISHED
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java
new file mode 100644
index 000000000..d19f26be0
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Checkpoint lock for outer usage which should be used to protect data during writing to memory. It contains complex logic for the correct
+ * taking of inside checkpoint lock(timeout, force checkpoint, etc.).
+ */
+public class CheckpointTimeoutLock implements IgniteComponent {
+ /** Ignite logger. */
+ protected final IgniteLogger log;
+
+ /**
+ * {@link PageMemoryImpl#safeToUpdate() Safe update check} for all page memories, should return {@code false} if there are many dirty
+ * pages and a checkpoint is needed.
+ */
+ private final BooleanSupplier safeToUpdateAllPageMemories;
+
+ /** Internal checkpoint lock. */
+ private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+ /** Service for triggering the checkpoint. */
+ private final Checkpointer checkpointer;
+
+ /** Timeout for checkpoint read lock acquisition in milliseconds. */
+ private volatile long checkpointReadLockTimeout;
+
+ /** Stop flag. */
+ private boolean stop;
+
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ * @param checkpointReadWriteLock Checkpoint read-write lock.
+ * @param checkpointReadLockTimeout Timeout for checkpoint read lock acquisition in milliseconds.
+ * @param safeToUpdateAllPageMemories {@link PageMemoryImpl#safeToUpdate() Safe update check} for all page memories, should return
+ * {@code false} if there are many dirty pages and a checkpoint is needed.
+ * @param checkpointer Service for triggering the checkpoint.
+ */
+ public CheckpointTimeoutLock(
+ IgniteLogger log,
+ CheckpointReadWriteLock checkpointReadWriteLock,
+ long checkpointReadLockTimeout,
+ BooleanSupplier safeToUpdateAllPageMemories,
+ Checkpointer checkpointer
+ ) {
+ this.log = log;
+ this.checkpointReadWriteLock = checkpointReadWriteLock;
+ this.checkpointReadLockTimeout = checkpointReadLockTimeout;
+ this.safeToUpdateAllPageMemories = safeToUpdateAllPageMemories;
+ this.checkpointer = checkpointer;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ stop = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() {
+ checkpointReadWriteLock.writeLock();
+
+ try {
+ stop = true;
+ } finally {
+ checkpointReadWriteLock.writeUnlock();
+ }
+ }
+
+ /**
+ * Gets the checkpoint read lock.
+ *
+ * @throws IgniteInternalException If failed.
+ * @throws CheckpointReadLockTimeoutException If failed to get checkpoint read lock by timeout.
+ */
+ public void checkpointReadLock() {
+ if (checkpointReadWriteLock.isWriteLockHeldByCurrentThread()) {
+ return;
+ }
+
+ long timeout = checkpointReadLockTimeout;
+
+ long start = coarseCurrentTimeMillis();
+
+ boolean interrupted = false;
+
+ try {
+ for (; ; ) {
+ try {
+ if (timeout > 0 && (coarseCurrentTimeMillis() - start) >= timeout) {
+ failCheckpointReadLock();
+ }
+
+ try {
+ if (timeout > 0) {
+ if (!checkpointReadWriteLock.tryReadLock(timeout - (coarseCurrentTimeMillis() - start), MILLISECONDS)) {
+ failCheckpointReadLock();
+ }
+ } else {
+ checkpointReadWriteLock.readLock();
+ }
+ } catch (InterruptedException e) {
+ interrupted = true;
+
+ continue;
+ }
+
+ if (stop) {
+ checkpointReadWriteLock.readUnlock();
+
+ throw new IgniteInternalException(new NodeStoppingException("Failed to get checkpoint read lock"));
+ }
+
+ if (checkpointReadWriteLock.getReadHoldCount() > 1
+ || safeToUpdateAllPageMemories.getAsBoolean()
+ || checkpointer.runner() == null
+ ) {
+ break;
+ } else {
+ // If the checkpoint is triggered outside the lock,
+ // it could cause the checkpoint to fire again for the same reason
+ // (due to a data race between collecting dirty pages and triggering the checkpoint).
+ CheckpointProgress checkpoint = checkpointer.scheduleCheckpoint(0, "too many dirty pages");
+
+ checkpointReadWriteLock.readUnlock();
+
+ if (timeout > 0 && coarseCurrentTimeMillis() - start >= timeout) {
+ failCheckpointReadLock();
+ }
+
+ try {
+ getUninterruptibly(checkpoint.futureFor(LOCK_RELEASED));
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException("Failed to wait for checkpoint begin", e.getCause());
+ } catch (CancellationException e) {
+ throw new IgniteInternalException("Failed to wait for checkpoint begin", e);
+ }
+ }
+ } catch (CheckpointReadLockTimeoutException e) {
+ log.error(e.getMessage(), e);
+
+ throw e;
+
+ // TODO: IGNITE-16899 After the implementation of FailureProcessor,
+ // by analogy with 2.0, we need to reset the timeout and try again
+ //timeout = 0;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Tries to get a checkpoint read lock.
+ *
+ * @return {@code True} if the checkpoint read lock is acquired.
+ */
+ public boolean tryCheckpointReadLock() {
+ return checkpointReadWriteLock.tryReadLock();
+ }
+
+ /**
+ * Releases the checkpoint read lock.
+ */
+ public void checkpointReadUnlock() {
+ checkpointReadWriteLock.readUnlock();
+ }
+
+ /**
+ * Returns timeout for checkpoint read lock acquisition in milliseconds.
+ */
+ public long checkpointReadLockTimeout() {
+ return checkpointReadLockTimeout;
+ }
+
+ /**
+ * Sets timeout for checkpoint read lock acquisition.
+ *
+ * @param val New timeout in milliseconds, non-positive value denotes infinite timeout.
+ */
+ public void checkpointReadLockTimeout(long val) {
+ checkpointReadLockTimeout = val;
+ }
+
+ /**
+ * Returns {@code true} if checkpoint lock is held by current thread.
+ */
+ public boolean checkpointLockIsHeldByThread() {
+ return checkpointReadWriteLock.checkpointLockIsHeldByThread();
+ }
+
+ private void failCheckpointReadLock() throws CheckpointReadLockTimeoutException {
+ // TODO: IGNITE-16899 After the implementation of FailureProcessor, by analogy with 2.0,
+ // either fail the node or try acquire read lock again by throwing an CheckpointReadLockTimeoutException
+
+ throw new CheckpointReadLockTimeoutException("Checkpoint read lock acquisition has been timed out");
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
similarity index 52%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 625c3579b..94056b669 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -15,27 +15,26 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.jetbrains.annotations.Nullable;
/**
- * Page replacement policy factory.
+ * Empty.
*/
-public interface PageReplacementPolicyFactory {
+// TODO: IGNITE-16898 Continue porting the code
+public abstract class Checkpointer {
/**
- * Calculates amount of memory required to service {@code pagesCnt} pages.
+ * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
*
- * @param pagesCnt Pages count.
+ * @param delayFromNow Delay from now in milliseconds.
+ * @param reason Wakeup reason.
+ * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
*/
- long requiredMemory(int pagesCnt);
+ public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
/**
- * Create page replacement policy.
- *
- * @param seg Page memory segment.
- * @param ptr Pointer to memory region.
- * @param pagesCnt Pages count.
+ * Returns runner thread, {@code null} if the worker has not yet started executing.
*/
- PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt);
+ public abstract @Nullable Thread runner();
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java
new file mode 100644
index 000000000..da5a9a44b
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.lang.ThreadLocal.withInitial;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * ReentrantReadWriteLock adapter with readLock tracking.
+ */
+public class ReentrantReadWriteLockWithTracking implements ReadWriteLock {
+ /** Delegate instance. */
+ private final ReentrantReadWriteLock delegate = new ReentrantReadWriteLock();
+
+ /** Read lock holder. */
+ private final ReentrantReadWriteLock.ReadLock readLock;
+
+ /** Write lock holder. */
+ private final ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock.WriteLock(delegate) {
+ };
+
+ /**
+ * ReentrantReadWriteLock wrapper, provides additional trace info on {@link ReadLockWithTracking#unlock()} method, if someone holds the
+ * lock more than {@code readLockThreshold}.
+ *
+ * @param log Ignite logger.
+ * @param readLockThreshold ReadLock threshold timeout.
+ */
+ public ReentrantReadWriteLockWithTracking(IgniteLogger log, long readLockThreshold) {
+ readLock = new ReadLockWithTracking(delegate, log, readLockThreshold);
+ }
+
+ /**
+ * Delegator implementation.
+ */
+ public ReentrantReadWriteLockWithTracking() {
+ readLock = new ReentrantReadWriteLock.ReadLock(delegate) {
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReentrantReadWriteLock.ReadLock readLock() {
+ return readLock;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReentrantReadWriteLock.WriteLock writeLock() {
+ return writeLock;
+ }
+
+ /**
+ * Returns {@code true} if the current thread holds write lock and {@code false} otherwise.
+ */
+ public boolean isWriteLockedByCurrentThread() {
+ return delegate.isWriteLockedByCurrentThread();
+ }
+
+ /**
+ * Returns the number of reentrant read holds on this lock by the current thread. A reader thread has a hold on a lock for each lock
+ * action that is not matched by an unlock action.
+ */
+ public int getReadHoldCount() {
+ return delegate.getReadHoldCount();
+ }
+
+ /**
+ * Returns the number of read locks held for this lock. This method is designed for use in monitoring system state, not for
+ * synchronization control.
+ */
+ public int getReadLockCount() {
+ return delegate.getReadLockCount();
+ }
+
+ /**
+ * Tracks long read lock holders.
+ */
+ private static class ReadLockWithTracking extends ReentrantReadWriteLock.ReadLock {
+ private static final long serialVersionUID = 0L;
+
+ private final ThreadLocal<IgniteBiTuple<Integer, Long>> readLockHolderTs = withInitial(() -> new IgniteBiTuple<>(0, 0L));
+
+ private final IgniteLogger log;
+
+ private final long readLockThreshold;
+
+ /**
+ * Constructor.
+ *
+ * @param lock Outer lock object.
+ * @param log Logger.
+ * @param readLockThreshold Lock print threshold in milliseconds.
+ */
+ protected ReadLockWithTracking(ReentrantReadWriteLock lock, IgniteLogger log, long readLockThreshold) {
+ super(lock);
+
+ this.log = log;
+
+ this.readLockThreshold = readLockThreshold;
+ }
+
+ private void inc() {
+ IgniteBiTuple<Integer, Long> val = readLockHolderTs.get();
+
+ int cntr = val.get1();
+
+ if (cntr == 0) {
+ val.set2(coarseCurrentTimeMillis());
+ }
+
+ val.set1(++cntr);
+
+ readLockHolderTs.set(val);
+ }
+
+ private void dec() {
+ IgniteBiTuple<Integer, Long> val = readLockHolderTs.get();
+
+ int cntr = val.get1();
+
+ if (--cntr == 0) {
+ long timeout = coarseCurrentTimeMillis() - val.get2();
+
+ if (timeout > readLockThreshold) {
+ log.warn(
+ "ReadLock held the lock more than " + timeout + " ms.",
+ new IgniteInternalException()
+ );
+ }
+ }
+
+ val.set1(cntr);
+
+ readLockHolderTs.set(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void lock() {
+ super.lock();
+
+ inc();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ super.lockInterruptibly();
+
+ inc();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean tryLock() {
+ if (super.tryLock()) {
+ inc();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
+ if (super.tryLock(timeout, unit)) {
+ inc();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void unlock() {
+ super.unlock();
+
+ dec();
+ }
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementFlags.java
similarity index 96%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementFlags.java
index 339ab2d21..2164ba389 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementFlags.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
import static org.apache.ignite.internal.util.GridUnsafe.getLong;
@@ -43,7 +43,7 @@ public class ClockPageReplacementFlags {
* @param totalPagesCnt Total pages count.
* @param memPtr Pointer to memory region.
*/
- ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
+ public ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
pagesCnt = totalPagesCnt;
flagsPtr = memPtr;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicy.java
similarity index 93%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicy.java
index ea29aed72..6926e7b47 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
@@ -23,6 +23,7 @@ import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.O
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.LoadedPagesMap;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -40,7 +41,7 @@ public class ClockPageReplacementPolicy extends PageReplacementPolicy {
* @param ptr Pointer to memory region.
* @param pagesCnt Pages count.
*/
- protected ClockPageReplacementPolicy(Segment seg, long ptr, int pagesCnt) {
+ public ClockPageReplacementPolicy(Segment seg, long ptr, int pagesCnt) {
super(seg);
flags = new ClockPageReplacementFlags(pagesCnt, ptr);
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicyFactory.java
similarity index 84%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicyFactory.java
index 0eeb7dcc1..91b3a53c3 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicyFactory.java
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
/**
* {@link ClockPageReplacementPolicy} factory.
@@ -29,7 +31,7 @@ public class ClockPageReplacementPolicyFactory implements PageReplacementPolicyF
/** {@inheritDoc} */
@Override
- public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
+ public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
return new ClockPageReplacementPolicy(seg, ptr, pagesCnt);
}
}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageStoreWrite.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageStoreWrite.java
new file mode 100644
index 000000000..500d51892
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageStoreWrite.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Not thread safe and stateful class for page replacement of one page with write() delay. This allows to write page content without holding
+ * segment lock. Page data is copied into temp buffer during {@link #writePage(FullPageId, ByteBuffer, int)} and then sent to real
+ * implementation by {@link #finishReplacement()}.
+ */
+// TODO: IGNITE-15818 Maybe refactor.
+public class DelayedDirtyPageStoreWrite implements PageStoreWriter {
+ /** Real flush dirty page implementation. */
+ private final PageStoreWriter flushDirtyPage;
+
+ /** Page size. */
+ private final int pageSize;
+
+ /** Thread local with byte buffers. */
+ private final ThreadLocal<ByteBuffer> byteBufThreadLoc;
+
+ /** Replacing pages tracker, used to register & unregister pages being written. */
+ private final DelayedPageReplacementTracker tracker;
+
+ /** Full page id to be written on {@link #finishReplacement()} or null if nothing to write. */
+ @Nullable
+ private FullPageId fullPageId;
+
+ /** Byte buffer with page data to be written on {@link #finishReplacement()} or null if nothing to write. */
+ @Nullable
+ private ByteBuffer byteBuf;
+
+ /** Partition update tag to be used in{@link #finishReplacement()} or null if -1 to write. */
+ private int tag = -1;
+
+ /**
+ * Constructor.
+ *
+ * @param flushDirtyPage real writer to save page to store.
+ * @param byteBufThreadLoc thread local buffers to use for pages copying.
+ * @param pageSize page size.
+ * @param tracker tracker to lock/unlock page reads.
+ */
+ public DelayedDirtyPageStoreWrite(
+ PageStoreWriter flushDirtyPage,
+ ThreadLocal<ByteBuffer> byteBufThreadLoc,
+ int pageSize,
+ DelayedPageReplacementTracker tracker
+ ) {
+ this.flushDirtyPage = flushDirtyPage;
+ this.pageSize = pageSize;
+ this.byteBufThreadLoc = byteBufThreadLoc;
+ this.tracker = tracker;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) {
+ tracker.lock(fullPageId);
+
+ ByteBuffer tlb = byteBufThreadLoc.get();
+
+ tlb.rewind();
+
+ long writeAddr = GridUnsafe.bufferAddress(tlb);
+ long origBufAddr = GridUnsafe.bufferAddress(byteBuf);
+
+ GridUnsafe.copyMemory(origBufAddr, writeAddr, pageSize);
+
+ this.fullPageId = fullPageId;
+ this.byteBuf = tlb;
+ this.tag = tag;
+ }
+
+ /**
+ * Runs actual write if required. Method is 'no op' if there was no page selected for replacement.
+ *
+ * @throws IgniteInternalCheckedException if write failed.
+ */
+ public void finishReplacement() throws IgniteInternalCheckedException {
+ if (byteBuf == null && fullPageId == null) {
+ return;
+ }
+
+ try {
+ flushDirtyPage.writePage(fullPageId, byteBuf, tag);
+ } finally {
+ tracker.unlock(fullPageId);
+
+ fullPageId = null;
+ byteBuf = null;
+ tag = -1;
+ }
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
new file mode 100644
index 000000000..12ec6b0c4
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.segmentIndex;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being written to page store.
+ */
+public class DelayedPageReplacementTracker {
+ /** Page size. */
+ private final int pageSize;
+
+ /** Flush dirty page real implementation. */
+ private final PageStoreWriter flushDirtyPage;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Lock stripes for pages read protection. */
+ private final Stripe[] stripes;
+
+ /** Byte buffer thread local. */
+ private final ThreadLocal<ByteBuffer> byteBufThreadLoc = new ThreadLocal<>() {
+ /** {@inheritDoc} */
+ @Override
+ protected ByteBuffer initialValue() {
+ ByteBuffer buf = ByteBuffer.allocateDirect(pageSize);
+
+ buf.order(ByteOrder.nativeOrder());
+
+ return buf;
+ }
+ };
+
+ /**
+ * Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageStoreWrite} is stateful and not
+ * thread safe, this thread local protects from GC pressure on pages replacement. <br> Map is used instead of build-in thread local to
+ * allow GC to remove delayed writers for alive threads after node stop.
+ */
+ private final Map<Long, DelayedDirtyPageStoreWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param pageSize Page size.
+ * @param flushDirtyPage Flush dirty page.
+ * @param log Logger.
+ * @param segmentCnt Segments count.
+ */
+ public DelayedPageReplacementTracker(
+ int pageSize,
+ PageStoreWriter flushDirtyPage,
+ IgniteLogger log,
+ int segmentCnt
+ ) {
+ this.pageSize = pageSize;
+ this.flushDirtyPage = flushDirtyPage;
+ this.log = log;
+
+ stripes = new Stripe[segmentCnt];
+
+ for (int i = 0; i < stripes.length; i++) {
+ stripes[i] = new Stripe();
+ }
+ }
+
+ /**
+ * Returns delayed page write implementation, finish method to be called to actually write page.
+ */
+ public DelayedDirtyPageStoreWrite delayedPageWrite() {
+ return delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
+ id -> new DelayedDirtyPageStoreWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
+ }
+
+ /**
+ * Returns stripe related to current page identifier.
+ *
+ * @param id Full page ID.
+ */
+ private Stripe stripe(FullPageId id) {
+ int segmentIdx = segmentIndex(id.groupId(), id.pageId(), stripes.length);
+
+ return stripes[segmentIdx];
+ }
+
+ /**
+ * Locks the page.
+ *
+ * @param id Full page ID to lock from read.
+ */
+ public void lock(FullPageId id) {
+ stripe(id).lock(id);
+ }
+
+ /**
+ * Method is returned when page is available to be loaded from store, or waits for replacement finish.
+ *
+ * @param id full page ID to be loaded from store.
+ */
+ public void waitUnlock(FullPageId id) {
+ stripe(id).waitUnlock(id);
+ }
+
+ /**
+ * Unlocks the page.
+ *
+ * @param id Full page ID, which write has been finished, it is available for reading.
+ */
+ public void unlock(FullPageId id) {
+ stripe(id).unlock(id);
+ }
+
+ /**
+ * Stripe for locking pages from reading from store in parallel with not finished write.
+ */
+ private class Stripe {
+ /**
+ * Page IDs which are locked for reading from store. Page content is being written right now. guarded by collection object monitor.
+ */
+ private final Collection<FullPageId> locked = new HashSet<>(Runtime.getRuntime().availableProcessors() * 2);
+
+ /**
+ * Has locked pages, flag for fast check if there are some pages, what were replaced and is being written. Write to field is guarded
+ * by {@link #locked} monitor.
+ */
+ private volatile boolean hasLockedPages;
+
+ /**
+ * Locks the page.
+ *
+ * @param id full page ID to lock from read
+ */
+ public void lock(FullPageId id) {
+ synchronized (locked) {
+ hasLockedPages = true;
+
+ boolean add = locked.add(id);
+
+ assert add : "Double locking of page for replacement is not possible";
+ }
+ }
+
+ /**
+ * Method is returned when page is available to be loaded from store, or waits for replacement finish.
+ *
+ * @param id Full page ID to be loaded from store.
+ */
+ public void waitUnlock(FullPageId id) {
+ if (!hasLockedPages) {
+ return;
+ }
+
+ synchronized (locked) {
+ if (!hasLockedPages) {
+ return;
+ }
+
+ boolean interrupted = false;
+
+ while (locked.contains(id)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Found replaced page [" + id + "] which is being written to page store, wait for finish replacement");
+ }
+
+ try {
+ // Uninterruptable wait.
+ locked.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Unlocks the page.
+ *
+ * @param id Full page ID, which write has been finished, it is available for reading.
+ */
+ public void unlock(FullPageId id) {
+ synchronized (locked) {
+ boolean rmv = locked.remove(id);
+
+ assert rmv : "Unlocking page ID never locked, id " + id;
+
+ if (locked.isEmpty()) {
+ hasLockedPages = false;
+ }
+
+ locked.notifyAll();
+ }
+ }
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicy.java
similarity index 96%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicy.java
index 6cca28c86..ba2170b5e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
import org.apache.ignite.lang.IgniteInternalCheckedException;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicyFactory.java
similarity index 95%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicyFactory.java
index 625c3579b..387c9988d 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicyFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
similarity index 92%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
index 8ecae86bb..8ce0471ca 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.META_PAGE_ID;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.PAGE_OVERHEAD;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
@@ -28,7 +29,11 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.freelist.io.PagesListMetaIo;
import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.LoadedPagesMap;
+import org.apache.ignite.internal.pagememory.persistence.PageHeader;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.internal.pagememory.persistence.PagePool;
+import org.apache.ignite.internal.pagememory.persistence.ReplaceCandidate;
import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
@@ -95,7 +100,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
final long absPageAddr = seg.absolute(rndAddr);
- FullPageId fullId = PageHeader.fullPageId(absPageAddr);
+ FullPageId fullId = fullPageId(absPageAddr);
// Check page mapping consistency.
assert fullId.equals(nearest.fullId()) : "Invalid page mapping [tableId=" + nearest.fullId()
@@ -152,7 +157,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
final long absRmvAddr = seg.absolute(relRmvAddr);
- final FullPageId fullPageId = PageHeader.fullPageId(absRmvAddr);
+ final FullPageId fullPageId = fullPageId(absRmvAddr);
if (!seg.tryToRemovePage(fullPageId, absRmvAddr)) {
if (iterations > 10) {
@@ -214,7 +219,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
final long absPageAddr = seg.absolute(addr);
- FullPageId fullId = PageHeader.fullPageId(absPageAddr);
+ FullPageId fullId = fullPageId(absPageAddr);
if (partGen < seg.partGeneration(fullId.groupId(), partitionId(fullId.pageId()))) {
return seg.refreshOutdatedPage(fullId.groupId(), fullId.pageId(), true);
@@ -228,7 +233,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
final long absEvictAddr = seg.absolute(addr);
- final FullPageId fullPageId = PageHeader.fullPageId(absEvictAddr);
+ final FullPageId fullPageId = fullPageId(absEvictAddr);
if (seg.tryToRemovePage(fullPageId, absEvictAddr)) {
return addr;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicyFactory.java
similarity index 94%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicyFactory.java
index 651cc14df..9db7446c1 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicyFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageList.java
similarity index 99%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageList.java
index c37cee4a5..bc84f6ebf 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageList.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import static org.apache.ignite.internal.util.GridUnsafe.getInt;
import static org.apache.ignite.internal.util.GridUnsafe.getLong;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicy.java
similarity index 96%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicy.java
index 247830f2a..9f1a250aa 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
@@ -23,6 +23,7 @@ import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.O
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.LoadedPagesMap;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
import org.apache.ignite.lang.IgniteInternalCheckedException;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicyFactory.java
similarity index 84%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicyFactory.java
index 4e1234a83..5c076e1d6 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicyFactory.java
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
/**
* {@link SegmentedLruPageReplacementPolicy} factory.
@@ -29,7 +31,7 @@ public class SegmentedLruPageReplacementPolicyFactory implements PageReplacement
/** {@inheritDoc} */
@Override
- public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
+ public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
return new SegmentedLruPageReplacementPolicy(seg, ptr, pagesCnt);
}
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
index 3b5aaa914..d74eb9107 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.pagememory.util.PageUtils;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -62,6 +63,13 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
@InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
protected PageMemoryDataRegionConfiguration dataRegionCfg;
+ @BeforeEach
+ void setUp() throws Exception {
+ dataRegionCfg
+ .change(cfg -> cfg.changePageSize(PAGE_SIZE).changeInitSize(MAX_MEMORY_SIZE).changeMaxSize(MAX_MEMORY_SIZE))
+ .get(1, SECONDS);
+ }
+
@Test
public void testPageTearingInner() throws Exception {
PageMemory mem = memory();
@@ -302,12 +310,6 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
* @throws Exception If failed.
*/
protected PageMemory memory() throws Exception {
- dataRegionCfg.change(cfg ->
- cfg.changePageSize(PAGE_SIZE)
- .changeInitSize(MAX_MEMORY_SIZE)
- .changeMaxSize(MAX_MEMORY_SIZE)
- ).get(1, SECONDS);
-
DirectMemoryProvider provider = new UnsafeMemoryProvider(null);
PageIoRegistry ioRegistry = new PageIoRegistry();
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
index 10682d5ea..35ee3233d 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
@@ -18,9 +18,17 @@
package org.apache.ignite.internal.pagememory.persistence;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.PAGE_OVERHEAD;
import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Set;
import java.util.stream.LongStream;
+import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.impl.PageMemoryNoLoadSelfTest;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -33,11 +41,11 @@ import org.junit.jupiter.api.Test;
public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
/** {@inheritDoc} */
@Override
- protected PageMemory memory() throws Exception {
- dataRegionCfg
- .change(cfg -> cfg.changePageSize(PAGE_SIZE).changeInitSize(MAX_MEMORY_SIZE).changeMaxSize(MAX_MEMORY_SIZE))
- .get(1, SECONDS);
+ protected PageMemory memory() {
+ return memory(LongStream.range(0, 10).map(i -> 5 * MiB).toArray());
+ }
+ protected PageMemoryImpl memory(long[] sizes) {
PageIoRegistry ioRegistry = new PageIoRegistry();
ioRegistry.loadFromServiceLoader();
@@ -46,9 +54,11 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
new UnsafeMemoryProvider(null),
dataRegionCfg,
ioRegistry,
- LongStream.range(0, 10).map(i -> 5 * MiB).toArray(),
+ sizes,
new TestPageReadWriteManager(),
(page, fullPageId, pageMemoryEx) -> {
+ },
+ (fullPageId, buf, tag) -> {
}
);
}
@@ -59,4 +69,58 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
public void testPageHandleDeallocation() {
// No-op.
}
+
+ @Test
+ void testDirtyPages() throws Exception {
+ PageMemoryImpl memory = (PageMemoryImpl) memory();
+
+ memory.start();
+
+ try {
+ Set<FullPageId> dirtyPages = Set.of(allocatePage(memory), allocatePage(memory));
+
+ assertThat(memory.dirtyPages(), equalTo(dirtyPages));
+
+ // TODO: IGNITE-16898 After the checkpoint check that there are no dirty pages
+ } finally {
+ memory.stop(true);
+ }
+ }
+
+ @Test
+ void testSafeToUpdate() throws Exception {
+ long systemPageSize = PAGE_SIZE + PAGE_OVERHEAD;
+
+ dataRegionCfg
+ .change(c -> c.changeInitSize(128 * systemPageSize).changeMaxSize(128 * systemPageSize))
+ .get(1, SECONDS);
+
+ PageMemoryImpl memory = memory(new long[]{100 * systemPageSize, 28 * systemPageSize});
+
+ memory.start();
+
+ try {
+ long maxPages = memory.totalPages();
+
+ long maxDirtyPages = (maxPages * 3 / 4);
+
+ assertThat(maxDirtyPages, greaterThanOrEqualTo(50L));
+
+ for (int i = 0; i < maxDirtyPages - 1; i++) {
+ allocatePage(memory);
+
+ assertTrue(memory.safeToUpdate(), "i=" + i);
+ }
+
+ for (int i = (int) maxDirtyPages - 1; i < maxPages; i++) {
+ allocatePage(memory);
+
+ assertFalse(memory.safeToUpdate(), "i=" + i);
+ }
+
+ // TODO: IGNITE-16898 After the checkpoint check assertTrue(memory.safeToUpdate())
+ } finally {
+ memory.stop(true);
+ }
+ }
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
new file mode 100644
index 000000000..8ec9518d8
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointPages} testing.
+ */
+public class CheckpointPagesTest {
+ @Test
+ void testContains() {
+ CheckpointPages checkpointPages = new CheckpointPages(
+ Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
+ completedFuture(null)
+ );
+
+ assertTrue(checkpointPages.contains(new FullPageId(0, 0)));
+ assertTrue(checkpointPages.contains(new FullPageId(1, 0)));
+
+ assertFalse(checkpointPages.contains(new FullPageId(2, 0)));
+ assertFalse(checkpointPages.contains(new FullPageId(3, 0)));
+ }
+
+ @Test
+ void testSize() {
+ CheckpointPages checkpointPages = new CheckpointPages(
+ Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
+ completedFuture(null)
+ );
+
+ assertEquals(2, checkpointPages.size());
+ }
+
+ @Test
+ void testMarkAsSaved() {
+ CheckpointPages checkpointPages = new CheckpointPages(
+ new HashSet<>(Set.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0))),
+ completedFuture(null)
+ );
+
+ assertTrue(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+ assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+ assertEquals(2, checkpointPages.size());
+
+ assertFalse(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+ assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+ assertEquals(2, checkpointPages.size());
+
+ assertTrue(checkpointPages.markAsSaved(new FullPageId(1, 0)));
+ assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+ assertEquals(1, checkpointPages.size());
+ }
+
+ @Test
+ void testAllowToSave() throws Exception {
+ Set<FullPageId> pages = Set.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0));
+
+ CheckpointPages checkpointPages = new CheckpointPages(pages, completedFuture(null));
+
+ assertTrue(checkpointPages.allowToSave(new FullPageId(0, 0)));
+ assertTrue(checkpointPages.allowToSave(new FullPageId(1, 0)));
+ assertTrue(checkpointPages.allowToSave(new FullPageId(2, 0)));
+
+ assertFalse(checkpointPages.allowToSave(new FullPageId(3, 0)));
+
+ IgniteInternalCheckedException exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> new CheckpointPages(pages, failedFuture(new Exception("test"))).allowToSave(new FullPageId(0, 0))
+ );
+
+ assertThat(exception.getCause(), instanceOf(Exception.class));
+ assertThat(exception.getCause().getMessage(), equalTo("test"));
+
+ exception = assertThrows(
+ IgniteInternalCheckedException.class,
+ () -> {
+ CompletableFuture<Object> future = new CompletableFuture<>();
+
+ future.cancel(true);
+
+ new CheckpointPages(pages, future).allowToSave(new FullPageId(0, 0));
+ }
+ );
+
+ assertThat(exception.getCause(), instanceOf(CancellationException.class));
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLockTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLockTest.java
new file mode 100644
index 000000000..500c87fb6
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLockTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointReadWriteLock} testing.
+ */
+public class CheckpointReadWriteLockTest {
+ @Test
+ void testReadLock() throws Exception {
+ CheckpointReadWriteLock lock0 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+ CheckpointReadWriteLock lock1 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+
+ lock1.writeLock();
+
+ lock0.readLock();
+ lock1.readLock();
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+
+ lock1.writeUnlock();
+
+ runAsync(() -> {
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+
+ lock0.readLock();
+ lock1.readLock();
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(1, lock1.getReadHoldCount());
+
+ lock0.readUnlock();
+ lock1.readUnlock();
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ }).get(1, TimeUnit.SECONDS);
+
+ lock1.writeLock();
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+
+ lock0.readUnlock();
+ lock1.readUnlock();
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ }
+
+ @Test
+ void testTryReadLock() throws Exception {
+ CheckpointReadWriteLock lock0 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+ CheckpointReadWriteLock lock1 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+ CheckpointReadWriteLock lock2 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+
+ lock2.writeLock();
+
+ assertTrue(lock0.tryReadLock());
+ assertTrue(lock1.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+ assertTrue(lock2.tryReadLock());
+ assertTrue(lock2.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(1, lock1.getReadHoldCount());
+ assertEquals(0, lock2.getReadHoldCount());
+
+ runAsync(() -> {
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ assertEquals(0, lock2.getReadHoldCount());
+
+ assertFalse(lock2.tryReadLock());
+
+ try {
+ assertFalse(lock2.tryReadLock(1, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ fail(e);
+ }
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ assertEquals(0, lock2.getReadHoldCount());
+ }).get(1, TimeUnit.SECONDS);
+
+ lock2.writeUnlock();
+
+ runAsync(() -> {
+ try {
+ assertTrue(lock0.tryReadLock());
+ assertTrue(lock1.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+ assertTrue(lock2.tryReadLock());
+ assertTrue(lock2.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(1, lock1.getReadHoldCount());
+ assertEquals(2, lock2.getReadHoldCount());
+
+ lock0.readUnlock();
+ lock1.readUnlock();
+ lock2.readUnlock();
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ assertEquals(1, lock2.getReadHoldCount());
+
+ lock2.readUnlock();
+
+ assertEquals(0, lock2.getReadHoldCount());
+ } catch (InterruptedException e) {
+ fail(e);
+ }
+ }).get(1, TimeUnit.SECONDS);
+
+ lock2.writeLock();
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(1, lock1.getReadHoldCount());
+ assertEquals(0, lock2.getReadHoldCount());
+
+ lock0.readUnlock();
+ lock1.readUnlock();
+ lock2.readUnlock();
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ assertEquals(0, lock2.getReadHoldCount());
+ }
+
+ @Test
+ void testCheckpointLockIsHeldByThread() throws Exception {
+ CheckpointReadWriteLock lock0 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+ CheckpointReadWriteLock lock1 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+ CheckpointReadWriteLock lock2 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+
+ assertFalse(lock0.checkpointLockIsHeldByThread());
+ assertFalse(lock1.checkpointLockIsHeldByThread());
+ assertFalse(lock2.checkpointLockIsHeldByThread());
+
+ lock0.writeLock();
+ lock1.readLock();
+
+ assertTrue(lock0.checkpointLockIsHeldByThread());
+ assertTrue(lock1.checkpointLockIsHeldByThread());
+
+ runAsync(() -> assertTrue(lock2.checkpointLockIsHeldByThread()), "checkpoint-runner").get(1, TimeUnit.SECONDS);
+
+ runAsync(() -> {
+ assertFalse(lock0.checkpointLockIsHeldByThread());
+ assertFalse(lock1.checkpointLockIsHeldByThread());
+ assertFalse(lock2.checkpointLockIsHeldByThread());
+ }).get(1, TimeUnit.SECONDS);
+
+ runAsync(() -> {
+ assertFalse(lock0.tryReadLock());
+
+ try {
+ assertFalse(lock0.tryReadLock(1, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ fail(e);
+ }
+
+ assertFalse(lock0.checkpointLockIsHeldByThread());
+ }).get(1, TimeUnit.SECONDS);
+
+ lock0.writeUnlock();
+ lock1.readUnlock();
+
+ assertFalse(lock0.checkpointLockIsHeldByThread());
+ assertFalse(lock1.checkpointLockIsHeldByThread());
+ assertFalse(lock2.checkpointLockIsHeldByThread());
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
new file mode 100644
index 000000000..43cba5504
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointTimeoutLock} testing.
+ */
+public class CheckpointTimeoutLockTest {
+ private final IgniteLogger log = IgniteLogger.forClass(CheckpointTimeoutLockTest.class);
+
+ @Nullable
+ private CheckpointTimeoutLock timeoutLock;
+
+ @AfterEach
+ void tearDown() {
+ if (timeoutLock != null) {
+ readUnlock(timeoutLock);
+
+ timeoutLock.stop();
+ }
+ }
+
+ @Test
+ void testCheckpointReadLockTimeout() {
+ timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), Long.MAX_VALUE, () -> true, mock(Checkpointer.class));
+
+ timeoutLock.start();
+
+ assertEquals(Long.MAX_VALUE, timeoutLock.checkpointReadLockTimeout());
+
+ timeoutLock.checkpointReadLockTimeout(Long.MIN_VALUE);
+
+ assertEquals(Long.MIN_VALUE, timeoutLock.checkpointReadLockTimeout());
+ }
+
+ @Test
+ void testCheckpointReadLock() throws Exception {
+ CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, () -> true, mock(Checkpointer.class));
+ CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(log, newReadWriteLock(), 1, () -> true, mock(Checkpointer.class));
+
+ try {
+ timeoutLock0.start();
+ timeoutLock1.start();
+
+ assertDoesNotThrow(timeoutLock0::checkpointReadLock);
+ assertDoesNotThrow(timeoutLock1::checkpointReadLock);
+
+ // Checks reentrant lock.
+
+ assertDoesNotThrow(timeoutLock0::checkpointReadLock);
+ assertDoesNotThrow(timeoutLock1::checkpointReadLock);
+ } finally {
+ readUnlock(timeoutLock0);
+ readUnlock(timeoutLock1);
+
+ closeAll(timeoutLock0::stop, timeoutLock1::stop);
+ }
+ }
+
+ @Test
+ void testCheckpointReadLockWithWriteLockHeldByCurrentThread() {
+ CheckpointReadWriteLock readWriteLock = newReadWriteLock();
+
+ timeoutLock = new CheckpointTimeoutLock(log, readWriteLock, 1, () -> true, mock(Checkpointer.class));
+
+ timeoutLock.start();
+
+ readWriteLock.writeLock();
+
+ try {
+ assertDoesNotThrow(timeoutLock::checkpointReadLock);
+
+ // Check reentrant lock.
+
+ assertDoesNotThrow(timeoutLock::checkpointReadLock);
+ } finally {
+ writeUnlock(readWriteLock);
+ }
+ }
+
+ @Test
+ void testCheckpointReadLockFailOnNodeStop() {
+ timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), Long.MAX_VALUE, () -> true, mock(Checkpointer.class));
+
+ timeoutLock.stop();
+
+ IgniteInternalException exception = assertThrows(IgniteInternalException.class, timeoutLock::checkpointReadLock);
+
+ assertThat(exception.getCause(), instanceOf(NodeStoppingException.class));
+ }
+
+ @Test
+ void testCheckpointReadLockTimeoutFail() throws Exception {
+ CheckpointReadWriteLock readWriteLock0 = newReadWriteLock();
+ CheckpointReadWriteLock readWriteLock1 = newReadWriteLock();
+
+ CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(log, readWriteLock0, 0, () -> true, mock(Checkpointer.class));
+ CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(log, readWriteLock1, 1, () -> true, mock(Checkpointer.class));
+
+ try {
+ timeoutLock0.start();
+ timeoutLock1.start();
+
+ readWriteLock0.writeLock();
+ readWriteLock1.writeLock();
+
+ CountDownLatch startThreadLatch = new CountDownLatch(2);
+
+ CompletableFuture<?> readLockFuture0 = runAsync(() -> checkpointReadLock(startThreadLatch, timeoutLock0));
+ CompletableFuture<?> readLockFuture1 = runAsync(() -> checkpointReadLock(startThreadLatch, timeoutLock1));
+
+ assertTrue(startThreadLatch.await(100, MILLISECONDS));
+
+ // For the Windows case, getting a read lock can take up to 100 ms.
+ assertThrows(TimeoutException.class, () -> readLockFuture0.get(100, MILLISECONDS));
+
+ ExecutionException exception = assertThrows(ExecutionException.class, () -> readLockFuture1.get(100, MILLISECONDS));
+
+ assertThat(exception.getCause().getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
+ } finally {
+ writeUnlock(readWriteLock0);
+ writeUnlock(readWriteLock1);
+
+ closeAll(timeoutLock0::stop, timeoutLock1::stop);
+ }
+ }
+
+ @Test
+ void testCheckpointReadLockTimeoutWithInterruptionFail() throws Exception {
+ CheckpointReadWriteLock readWriteLock0 = newReadWriteLock();
+ CheckpointReadWriteLock readWriteLock1 = newReadWriteLock();
+
+ CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(log, readWriteLock0, 0, () -> true, mock(Checkpointer.class));
+ CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(log, readWriteLock1, 1, () -> true, mock(Checkpointer.class));
+
+ try {
+ timeoutLock0.start();
+ timeoutLock1.start();
+
+ readWriteLock0.writeLock();
+ readWriteLock1.writeLock();
+
+ CountDownLatch startThreadLatch = new CountDownLatch(2);
+ CountDownLatch interruptedThreadLatch = new CountDownLatch(2);
+
+ CompletableFuture<?> readLockFuture0 = runAsync(() -> {
+ currentThread().interrupt();
+
+ try {
+ checkpointReadLock(startThreadLatch, timeoutLock0);
+ } finally {
+ interruptedThreadLatch.countDown();
+ }
+ });
+
+ CompletableFuture<?> readLockFuture1 = runAsync(() -> {
+ currentThread().interrupt();
+
+ try {
+ checkpointReadLock(startThreadLatch, timeoutLock1);
+ } finally {
+ interruptedThreadLatch.countDown();
+ }
+ });
+
+ assertTrue(startThreadLatch.await(100, MILLISECONDS));
+
+ // For the Windows case, getting a read lock can take up to 100 ms.
+ assertThrows(TimeoutException.class, () -> readLockFuture0.get(100, MILLISECONDS));
+
+ ExecutionException exception = assertThrows(ExecutionException.class, () -> readLockFuture1.get(100, MILLISECONDS));
+
+ assertThat(exception.getCause().getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
+
+ writeUnlock(readWriteLock0);
+ writeUnlock(readWriteLock1);
+
+ assertTrue(interruptedThreadLatch.await(100, MILLISECONDS));
+ } finally {
+ writeUnlock(readWriteLock0);
+ writeUnlock(readWriteLock1);
+
+ closeAll(timeoutLock0::stop, timeoutLock1::stop);
+ }
+ }
+
+ @Test
+ void testScheduleCheckpoint() throws Exception {
+ CompletableFuture<?> lockRealiseFuture = mock(CompletableFuture.class);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ when(lockRealiseFuture.get()).then(a -> {
+ latch.countDown();
+
+ return null;
+ });
+
+ Checkpointer checkpointer = newCheckpointer(currentThread(), lockRealiseFuture);
+
+ AtomicBoolean safeToUpdate = new AtomicBoolean();
+
+ timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, safeToUpdate::get, checkpointer);
+
+ timeoutLock.start();
+
+ CompletableFuture<?> readLockFuture = runAsync(() -> {
+ timeoutLock.checkpointReadLock();
+
+ timeoutLock.checkpointReadUnlock();
+ });
+
+ assertTrue(latch.await(100, MILLISECONDS));
+
+ safeToUpdate.set(true);
+
+ readLockFuture.get(100, MILLISECONDS);
+ }
+
+ @Test
+ void testFailureLockReleasedFuture() {
+ Checkpointer checkpointer = newCheckpointer(currentThread(), failedFuture(new Exception("test")));
+
+ timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, () -> false, checkpointer);
+
+ timeoutLock.start();
+
+ IgniteInternalException exception = assertThrows(IgniteInternalException.class, timeoutLock::checkpointReadLock);
+
+ assertThat(exception.getCause(), instanceOf(Exception.class));
+ assertThat(exception.getCause().getMessage(), equalTo("test"));
+ }
+
+ @Test
+ void testCanceledLockReleasedFuture() {
+ CompletableFuture<?> future = new CompletableFuture<>();
+
+ future.cancel(true);
+
+ Checkpointer checkpointer = newCheckpointer(currentThread(), future);
+
+ PageMemoryDataRegion dataRegion = newPageMemoryDataRegion(true, new AtomicBoolean());
+
+ timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, () -> false, checkpointer);
+
+ timeoutLock.start();
+
+ IgniteInternalException exception = assertThrows(IgniteInternalException.class, timeoutLock::checkpointReadLock);
+
+ assertThat(exception.getCause(), instanceOf(CancellationException.class));
+ }
+
+ private void checkpointReadLock(CountDownLatch latch, CheckpointTimeoutLock lock) {
+ latch.countDown();
+
+ lock.checkpointReadLock();
+
+ lock.checkpointReadUnlock();
+ }
+
+ private void writeUnlock(CheckpointReadWriteLock lock) {
+ if (lock.isWriteLockHeldByCurrentThread()) {
+ lock.writeUnlock();
+ }
+ }
+
+ private void readUnlock(CheckpointTimeoutLock lock) {
+ while (lock.checkpointLockIsHeldByThread()) {
+ lock.checkpointReadUnlock();
+ }
+ }
+
+ private CheckpointReadWriteLock newReadWriteLock() {
+ return new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking(log, 5_000));
+ }
+
+ private CheckpointProgress newCheckpointProgress(CompletableFuture<?> future) {
+ CheckpointProgress progress = mock(CheckpointProgress.class);
+
+ when(progress.futureFor(LOCK_RELEASED)).then(a -> future);
+
+ return progress;
+ }
+
+ private Checkpointer newCheckpointer(Thread runner, CompletableFuture<?> future) {
+ Checkpointer checkpointer = mock(Checkpointer.class);
+
+ when(checkpointer.runner()).thenReturn(runner);
+
+ CheckpointProgress checkpointProgress = newCheckpointProgress(future);
+
+ when(checkpointer.scheduleCheckpoint(0, "too many dirty pages")).thenReturn(checkpointProgress);
+
+ return checkpointer;
+ }
+
+ private PageMemoryDataRegion newPageMemoryDataRegion(boolean persistent, AtomicBoolean safeToUpdate) {
+ PageMemoryDataRegion dataRegion = mock(PageMemoryDataRegion.class);
+
+ when(dataRegion.persistent()).thenReturn(persistent);
+
+ PageMemoryImpl pageMemory = mock(PageMemoryImpl.class);
+
+ when(pageMemory.safeToUpdate()).then(a -> safeToUpdate.get());
+
+ when(dataRegion.pageMemory()).thenReturn(pageMemory);
+
+ return dataRegion;
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTrackingTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTrackingTest.java
new file mode 100644
index 000000000..6089b83a6
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTrackingTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * For {@link ReentrantReadWriteLockWithTracking} testing.
+ */
+public class ReentrantReadWriteLockWithTrackingTest {
+ private final IgniteLogger log = IgniteLogger.forClass(getClass());
+
+ @Test
+ void testIsWriteLockedByCurrentThread() throws Exception {
+ ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking();
+ ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, Long.MAX_VALUE);
+
+ assertFalse(lock0.isWriteLockedByCurrentThread());
+ assertFalse(lock1.isWriteLockedByCurrentThread());
+
+ lock0.writeLock().lock();
+ lock1.writeLock().lock();
+
+ assertTrue(lock0.isWriteLockedByCurrentThread());
+ assertTrue(lock1.isWriteLockedByCurrentThread());
+
+ runAsync(() -> {
+ assertFalse(lock0.isWriteLockedByCurrentThread());
+ assertFalse(lock1.isWriteLockedByCurrentThread());
+ }).get(1, TimeUnit.SECONDS);
+
+ lock0.writeLock().unlock();
+ lock1.writeLock().unlock();
+
+ assertFalse(lock0.isWriteLockedByCurrentThread());
+ assertFalse(lock1.isWriteLockedByCurrentThread());
+ }
+
+ @Test
+ void testGetReadHoldCount() throws Exception {
+ ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking();
+ ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, Long.MAX_VALUE);
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+
+ lock0.readLock().lock();
+ lock0.readLock().lock();
+
+ lock1.readLock().lock();
+
+ assertEquals(2, lock0.getReadHoldCount());
+ assertEquals(1, lock1.getReadHoldCount());
+
+ runAsync(() -> {
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+
+ lock0.readLock().lock();
+ lock1.readLock().lock();
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(1, lock1.getReadHoldCount());
+
+ lock0.readLock().unlock();
+ lock1.readLock().unlock();
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ }).get(1, TimeUnit.SECONDS);
+
+ assertEquals(2, lock0.getReadHoldCount());
+ assertEquals(1, lock1.getReadHoldCount());
+
+ lock0.readLock().unlock();
+ lock1.readLock().unlock();
+
+ assertEquals(1, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+
+ lock0.readLock().unlock();
+
+ assertEquals(0, lock0.getReadHoldCount());
+ assertEquals(0, lock1.getReadHoldCount());
+ }
+
+ @Test
+ void testGetReadLockCount() throws Exception {
+ ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking();
+ ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, Long.MAX_VALUE);
+
+ assertEquals(0, lock0.getReadLockCount());
+ assertEquals(0, lock1.getReadLockCount());
+
+ lock0.readLock().lock();
+ lock0.readLock().lock();
+
+ lock1.readLock().lock();
+
+ assertEquals(2, lock0.getReadLockCount());
+ assertEquals(1, lock1.getReadLockCount());
+
+ runAsync(() -> {
+ assertEquals(2, lock0.getReadLockCount());
+ assertEquals(1, lock1.getReadLockCount());
+
+ lock0.readLock().lock();
+ lock1.readLock().lock();
+
+ assertEquals(3, lock0.getReadLockCount());
+ assertEquals(2, lock1.getReadLockCount());
+
+ lock0.readLock().unlock();
+ lock1.readLock().unlock();
+
+ assertEquals(2, lock0.getReadLockCount());
+ assertEquals(1, lock1.getReadLockCount());
+ }).get(1, TimeUnit.SECONDS);
+
+ assertEquals(2, lock0.getReadLockCount());
+ assertEquals(1, lock1.getReadLockCount());
+
+ lock0.readLock().unlock();
+ lock1.readLock().unlock();
+
+ assertEquals(1, lock0.getReadLockCount());
+ assertEquals(0, lock1.getReadLockCount());
+
+ lock0.readLock().unlock();
+
+ assertEquals(0, lock0.getReadLockCount());
+ assertEquals(0, lock1.getReadLockCount());
+ }
+
+ @Test
+ void testPrintLongHoldReadLock() throws Exception {
+ IgniteLogger log = mock(IgniteLogger.class);
+
+ ArgumentCaptor<String> msgArgumentCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+ doNothing().when(log).warn(msgArgumentCaptor.capture(), throwableArgumentCaptor.capture());
+
+ ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking(log, 20);
+ ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, 200);
+
+ lock0.readLock().lock();
+ lock1.readLock().lock();
+
+ Thread.sleep(50);
+
+ lock0.readLock().unlock();
+ lock1.readLock().unlock();
+
+ assertThat(msgArgumentCaptor.getAllValues(), hasSize(1));
+ assertThat(throwableArgumentCaptor.getAllValues(), hasSize(1));
+
+ assertThat(msgArgumentCaptor.getValue(), Matchers.startsWith("ReadLock held the lock more than"));
+ assertThat(throwableArgumentCaptor.getValue(), instanceOf(IgniteInternalException.class));
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
similarity index 88%
rename from modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
rename to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
index d194a3a04..dd0b0e9ca 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.jetbrains.annotations.Nullable;
@@ -27,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
* Abstract data region for {@link PageMemoryStorageEngine}. Based on a {@link PageMemory}.
*/
// TODO: IGNITE-16641 Add support for persistent case.
-abstract class PageMemoryDataRegion implements IgniteComponent {
+abstract class AbstractPageMemoryDataRegion implements PageMemoryDataRegion, IgniteComponent {
protected final PageMemoryDataRegionConfiguration cfg;
protected final PageIoRegistry ioRegistry;
@@ -40,7 +41,7 @@ abstract class PageMemoryDataRegion implements IgniteComponent {
* @param cfg Data region configuration.
* @param ioRegistry IO registry.
*/
- public PageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, PageIoRegistry ioRegistry) {
+ public AbstractPageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, PageIoRegistry ioRegistry) {
this.cfg = cfg;
this.ioRegistry = ioRegistry;
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
index 3d25357b2..3ec2da275 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
@@ -72,7 +72,7 @@ class PageMemoryPartitionStorage implements PartitionStorage {
public PageMemoryPartitionStorage(
int partId,
TableConfiguration tableCfg,
- PageMemoryDataRegion dataRegion,
+ AbstractPageMemoryDataRegion dataRegion,
TableFreeList freeList
) throws StorageException {
assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
@@ -409,6 +409,7 @@ class PageMemoryPartitionStorage implements PartitionStorage {
}
}
+ /** {@inheritDoc} */
@Override
public long rowsCount() {
try {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
index 7f587de58..a109c5524 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.Nullable;
// TODO: IGNITE-16641 Add support for persistent case.
// TODO: IGNITE-16642 Support indexes.
abstract class PageMemoryTableStorage implements TableStorage {
- protected final PageMemoryDataRegion dataRegion;
+ protected final AbstractPageMemoryDataRegion dataRegion;
protected final TableConfiguration tableCfg;
@@ -56,7 +56,7 @@ abstract class PageMemoryTableStorage implements TableStorage {
* @param tableCfg – Table configuration.
* @param dataRegion – Data region for the table.
*/
- public PageMemoryTableStorage(TableConfiguration tableCfg, PageMemoryDataRegion dataRegion) {
+ public PageMemoryTableStorage(TableConfiguration tableCfg, AbstractPageMemoryDataRegion dataRegion) {
this.dataRegion = dataRegion;
this.tableCfg = tableCfg;
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
index 01f0b539b..cd44241a7 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -33,9 +33,9 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
- * Implementation of {@link PageMemoryDataRegion} for in-memory case.
+ * Implementation of {@link AbstractPageMemoryDataRegion} for in-memory case.
*/
-class VolatilePageMemoryDataRegion extends PageMemoryDataRegion {
+class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
private TableFreeList freeList;
/**