You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/03/02 10:17:48 UTC
[ignite-3] branch main updated: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting (#667)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f83ae45 IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting (#667)
f83ae45 is described below
commit f83ae45e5449fce3ebe20e1410366d2e5cd86d36
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Mar 2 13:16:07 2022 +0300
IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting (#667)
---
.../apache/ignite/internal/util/IgniteUtils.java | 17 +
.../persistence/ItBplusTreePageMemoryImplTest.java | 71 +
.../ItBplusTreeReuseListPageMemoryImplTest.java | 70 +
.../pagememory/tree/ItBplusTreeSelfTest.java | 21 +-
.../internal/pagememory/PageIdAllocator.java | 5 +
.../persistence/ClockPageReplacementFlags.java | 158 ++
.../persistence/ClockPageReplacementPolicy.java | 102 ++
.../ClockPageReplacementPolicyFactory.java | 35 +
.../pagememory/persistence/GroupPartitionId.java | 114 ++
.../pagememory/persistence/LoadedPagesMap.java | 137 ++
.../pagememory/persistence/PageHeader.java | 298 ++++
.../pagememory/persistence/PageMemoryEx.java | 119 ++
.../pagememory/persistence/PageMemoryImpl.java | 1601 ++++++++++++++++++++
.../internal/pagememory/persistence/PagePool.java | 277 ++++
.../persistence/PageReadWriteManager.java | 59 +
.../persistence/PageReplacementPolicy.java | 78 +
.../persistence/PageReplacementPolicyFactory.java | 41 +
.../RandomLruPageReplacementPolicy.java | 242 +++
.../RandomLruPageReplacementPolicyFactory.java | 37 +
.../pagememory/persistence/ReplaceCandidate.java | 79 +
.../persistence/RobinHoodBackwardShiftHashMap.java | 700 +++++++++
.../persistence/SegmentedLruPageList.java | 365 +++++
.../SegmentedLruPageReplacementPolicy.java | 111 ++
.../SegmentedLruPageReplacementPolicyFactory.java | 35 +
.../pagememory/impl/PageMemoryNoLoadSelfTest.java | 7 +-
.../persistence/PageMemoryImplNoLoadTest.java | 68 +
.../persistence/TestPageReadWriteManager.java | 57 +
27 files changed, 4895 insertions(+), 9 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index d179485..2e201a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -253,6 +253,23 @@ public class IgniteUtils {
}
/**
+ * Returns hex representation of memory region.
+ *
+ * @param addr Pointer in memory.
+ * @param len How much byte to read.
+ */
+ public static String toHexString(long addr, int len) {
+ StringBuilder sb = new StringBuilder(len * 2);
+
+ for (int i = 0; i < len; i++) {
+ // Can not use getLong because on little-endian it produces bs.
+ addByteAsHex(sb, GridUnsafe.getByte(addr + i));
+ }
+
+ return sb.toString();
+ }
+
+ /**
* Appends {@code byte} in hexadecimal format.
*
* @param sb String builder.
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
new file mode 100644
index 0000000..eefb3b8
--- /dev/null
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.apache.ignite.internal.util.Constants.MiB;
+
+import java.util.concurrent.TimeUnit;
+import java.util.stream.LongStream;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.ItBplusTreeSelfTest;
+
+/**
+ * Class to test the {@link BplusTree} with {@link PageMemoryImpl}.
+ */
+public class ItBplusTreePageMemoryImplTest extends ItBplusTreeSelfTest {
+ /** {@inheritDoc} */
+ @Override
+ protected PageMemory createPageMemory() throws Exception {
+ dataRegionCfg.change(c ->
+ c.convert(PageMemoryDataRegionChange.class)
+ .changePageSize(PAGE_SIZE)
+ .changeInitSize(MAX_MEMORY_SIZE)
+ .changeMaxSize(MAX_MEMORY_SIZE)
+ ).get(1, TimeUnit.SECONDS);
+
+ long[] sizes = LongStream.range(0, CPUS + 1).map(i -> MAX_MEMORY_SIZE / CPUS).toArray();
+
+ sizes[CPUS] = 10 * MiB;
+
+ TestPageIoRegistry ioRegistry = new TestPageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PageMemoryImpl(
+ new UnsafeMemoryProvider(null),
+ (PageMemoryDataRegionConfiguration) fixConfiguration(dataRegionCfg),
+ ioRegistry,
+ sizes,
+ new TestPageReadWriteManager(),
+ (page, fullPageId, pageMemoryEx) -> {
+ }
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long acquiredPages() {
+ return ((PageMemoryImpl) pageMem).acquiredPages();
+ }
+}
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
new file mode 100644
index 0000000..0a25faa
--- /dev/null
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.apache.ignite.internal.util.Constants.MiB;
+
+import java.util.concurrent.TimeUnit;
+import java.util.stream.LongStream;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagememory.tree.ItBplusTreeReuseSelfTest;
+
+/**
+ * Test with reuse list and {@link PageMemoryImpl}.
+ */
+public class ItBplusTreeReuseListPageMemoryImplTest extends ItBplusTreeReuseSelfTest {
+ /** {@inheritDoc} */
+ @Override
+ protected PageMemory createPageMemory() throws Exception {
+ dataRegionCfg.change(c ->
+ c.convert(PageMemoryDataRegionChange.class)
+ .changePageSize(PAGE_SIZE)
+ .changeInitSize(MAX_MEMORY_SIZE)
+ .changeMaxSize(MAX_MEMORY_SIZE)
+ ).get(1, TimeUnit.SECONDS);
+
+ long[] sizes = LongStream.range(0, CPUS + 1).map(i -> MAX_MEMORY_SIZE / CPUS).toArray();
+
+ sizes[CPUS] = 10 * MiB;
+
+ TestPageIoRegistry ioRegistry = new TestPageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PageMemoryImpl(
+ new UnsafeMemoryProvider(null),
+ (PageMemoryDataRegionConfiguration) fixConfiguration(dataRegionCfg),
+ ioRegistry,
+ sizes,
+ new TestPageReadWriteManager(),
+ (page, fullPageId, pageMemoryEx) -> {
+ }
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long acquiredPages() {
+ return ((PageMemoryImpl) pageMem).acquiredPages();
+ }
+}
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java
index d308048..a01ab91 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/ItBplusTreeSelfTest.java
@@ -35,7 +35,7 @@ import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
-import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.apache.ignite.internal.util.Constants.GiB;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -135,6 +135,10 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
private static int RMV_INC = 1;
+ protected static final int PAGE_SIZE = 512;
+
+ protected static final long MAX_MEMORY_SIZE = GiB;
+
/** Forces printing lock/unlock events on the test tree. */
private static boolean PRINT_LOCKS = false;
@@ -146,7 +150,7 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
PageMemoryDataRegionConfigurationSchema.class,
UnsafeMemoryAllocatorConfigurationSchema.class
})
- private DataRegionConfiguration dataRegionCfg;
+ protected DataRegionConfiguration dataRegionCfg;
@Nullable
protected PageMemory pageMem;
@@ -2724,12 +2728,17 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
}
}
- private PageMemory createPageMemory() throws Exception {
+ /**
+ * Returns page memory.
+ *
+ * @throws Exception If failed.
+ */
+ protected PageMemory createPageMemory() throws Exception {
dataRegionCfg.change(c ->
c.convert(PageMemoryDataRegionChange.class)
- .changePageSize(512)
- .changeInitSize(1024 * MiB)
- .changeMaxSize(1024 * MiB)
+ .changePageSize(PAGE_SIZE)
+ .changeInitSize(MAX_MEMORY_SIZE)
+ .changeMaxSize(MAX_MEMORY_SIZE)
).get(1, TimeUnit.SECONDS);
TestPageIoRegistry ioRegistry = new TestPageIoRegistry();
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageIdAllocator.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageIdAllocator.java
index 05dc2b2..5f96ac7 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageIdAllocator.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageIdAllocator.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.pagememory;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
@@ -44,6 +46,9 @@ public interface PageIdAllocator {
*/
int INDEX_PARTITION = 0xFFFF;
+ /** Group meta page id. */
+ long META_PAGE_ID = pageId(INDEX_PARTITION, FLAG_AUX, 0);
+
/**
* Allocates a page from the space for the given partition ID and the given flags.
*
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
new file mode 100644
index 0000000..339ab2d
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+import static org.apache.ignite.internal.util.GridUnsafe.zeroMemory;
+
+import java.util.function.LongUnaryOperator;
+
+/**
+ * Clock page replacement algorithm implementation.
+ */
+public class ClockPageReplacementFlags {
+ /** Total pages count. */
+ private final int pagesCnt;
+
+ /** Index of the next candidate ("hand"). */
+ private int curIdx;
+
+ /** Pointer to memory region to store page hit flags. */
+ private final long flagsPtr;
+
+ /**
+ * Constructor.
+ *
+ * @param totalPagesCnt Total pages count.
+ * @param memPtr Pointer to memory region.
+ */
+ ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
+ pagesCnt = totalPagesCnt;
+ flagsPtr = memPtr;
+
+ zeroMemory(flagsPtr, (totalPagesCnt + 7) >> 3);
+ }
+
+ /**
+ * Find page to replace.
+ *
+ * @return Page index to replace.
+ */
+ public int poll() {
+ // This method is always executed under exclusive lock, no other synchronization or CAS required.
+ while (true) {
+ if (curIdx >= pagesCnt) {
+ curIdx = 0;
+ }
+
+ long ptr = flagsPtr + ((curIdx >> 3) & (~7L));
+
+ long flags = getLong(ptr);
+
+ if (((curIdx & 63) == 0) && (flags == ~0L)) {
+ putLong(ptr, 0L);
+
+ curIdx += 64;
+
+ continue;
+ }
+
+ long mask = ~0L << curIdx;
+
+ int bitIdx = Long.numberOfTrailingZeros(~flags & mask);
+
+ if (bitIdx == 64) {
+ putLong(ptr, flags & ~mask);
+
+ curIdx = (curIdx & ~63) + 64;
+ } else {
+ mask &= ~(~0L << bitIdx);
+
+ putLong(ptr, flags & ~mask);
+
+ curIdx = (curIdx & ~63) + bitIdx + 1;
+
+ if (curIdx <= pagesCnt) {
+ return curIdx - 1;
+ }
+ }
+ }
+ }
+
+ /**
+ * Get page hit flag.
+ *
+ * @param pageIdx Page index.
+ */
+ boolean getFlag(int pageIdx) {
+ long flags = getLong(flagsPtr + ((pageIdx >> 3) & (~7L)));
+
+ return (flags & (1L << pageIdx)) != 0L;
+ }
+
+ /**
+ * Clear page hit flag.
+ *
+ * @param pageIdx Page index.
+ */
+ public void clearFlag(int pageIdx) {
+ compareAndSwapFlag(pageIdx, flags -> flags & ~(1L << pageIdx));
+ }
+
+ /**
+ * Set page hit flag.
+ *
+ * @param pageIdx Page index.
+ */
+ public void setFlag(int pageIdx) {
+ compareAndSwapFlag(pageIdx, flags -> flags | (1L << pageIdx));
+ }
+
+ /**
+ * CAS page hit flag value.
+ *
+ * @param pageIdx Page index.
+ * @param func Function to apply to flags.
+ */
+ private void compareAndSwapFlag(int pageIdx, LongUnaryOperator func) {
+ long ptr = flagsPtr + ((pageIdx >> 3) & (~7L));
+
+ long oldFlags;
+ long newFlags;
+
+ do {
+ oldFlags = getLong(ptr);
+ newFlags = func.applyAsLong(oldFlags);
+
+ if (oldFlags == newFlags) {
+ return;
+ }
+ } while (!compareAndSwapLong(null, ptr, oldFlags, newFlags));
+ }
+
+ /**
+ * Memory required to service {@code pagesCnt} pages.
+ *
+ * @param pagesCnt Pages count.
+ */
+ public static long requiredMemory(int pagesCnt) {
+ return ((pagesCnt + 63) / 8) & (~7L) /* 1 bit per page + 8 byte align */;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java
new file mode 100644
index 0000000..ea29aed
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.OUTDATED_REL_PTR;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * CLOCK page replacement policy implementation.
+ */
+public class ClockPageReplacementPolicy extends PageReplacementPolicy {
+ /** Pages hit-flags store. */
+ private final ClockPageReplacementFlags flags;
+
+ /**
+ * Constructor.
+ *
+ * @param seg Page memory segment.
+ * @param ptr Pointer to memory region.
+ * @param pagesCnt Pages count.
+ */
+ protected ClockPageReplacementPolicy(Segment seg, long ptr, int pagesCnt) {
+ super(seg);
+
+ flags = new ClockPageReplacementFlags(pagesCnt, ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onHit(long relPtr) {
+ int pageIdx = (int) seg.pageIndex(relPtr);
+
+ flags.setFlag(pageIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onRemove(long relPtr) {
+ int pageIdx = (int) seg.pageIndex(relPtr);
+
+ flags.clearFlag(pageIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long replace() throws IgniteInternalCheckedException {
+ LoadedPagesMap loadedPages = seg.loadedPages();
+
+ for (int i = 0; i < loadedPages.size(); i++) {
+ int pageIdx = flags.poll();
+
+ long relPtr = seg.relative(pageIdx);
+ long absPtr = seg.absolute(relPtr);
+
+ FullPageId fullId = fullPageId(absPtr);
+
+ // Check loaded pages map for outdated page.
+ relPtr = loadedPages.get(
+ fullId.groupId(),
+ fullId.effectivePageId(),
+ seg.partGeneration(fullId.groupId(), partitionId(fullId.pageId())),
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+
+ assert relPtr != INVALID_REL_PTR;
+
+ if (relPtr == OUTDATED_REL_PTR) {
+ return seg.refreshOutdatedPage(fullId.groupId(), fullId.pageId(), true);
+ }
+
+ if (seg.tryToRemovePage(fullId, absPtr)) {
+ return relPtr;
+ }
+
+ flags.setFlag(pageIdx);
+ }
+
+ throw seg.oomException("no pages to replace");
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
new file mode 100644
index 0000000..0eeb7dc
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+/**
+ * {@link ClockPageReplacementPolicy} factory.
+ */
+public class ClockPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
+ /** {@inheritDoc} */
+ @Override
+ public long requiredMemory(int pagesCnt) {
+ return ClockPageReplacementFlags.requiredMemory(pagesCnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
+ return new ClockPageReplacementPolicy(seg, ptr, pagesCnt);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
new file mode 100644
index 0000000..157bec0
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Pair of group ID with partition ID. Immutable, comparable class, may be used as key in maps.
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+ /** Group ID. */
+ private final int grpId;
+
+ /** Partition ID. */
+ private final int partId;
+
+ /**
+ * Creates group-partition tuple.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ public GroupPartitionId(final int grpId, final int partId) {
+ this.grpId = grpId;
+ this.partId = partId;
+ }
+
+ /**
+ * Return group ID.
+ */
+ public int getGroupId() {
+ return grpId;
+ }
+
+ /**
+ * Return partition ID.
+ */
+ public int getPartitionId() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(GroupPartitionId.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GroupPartitionId key = (GroupPartitionId) o;
+
+ if (grpId != key.grpId) {
+ return false;
+ }
+
+ return partId == key.partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ int result = grpId;
+
+ result = 31 * result + partId;
+
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int compareTo(GroupPartitionId o) {
+ if (getGroupId() < o.getGroupId()) {
+ return -1;
+ }
+
+ if (getGroupId() > o.getGroupId()) {
+ return 1;
+ }
+
+ if (getPartitionId() < o.getPartitionId()) {
+ return -1;
+ }
+
+ if (getPartitionId() > o.getPartitionId()) {
+ return 1;
+ }
+
+ return 0;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/LoadedPagesMap.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/LoadedPagesMap.java
new file mode 100644
index 0000000..d8bebff
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/LoadedPagesMap.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.function.BiConsumer;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interface for storing correspondence of page ID in a group to long value (address in offheap segment).
+ *
+ * <p>Map support versioning of
+ * entries. Outdated entry (entry having version lower than requested), is not provided in case of get, outdated return value is provided
+ * instead.
+ *
+ * <p>This mapping is not thread safe. Operations should be protected by outside locking.<br>
+ */
+public interface LoadedPagesMap {
+ /**
+ * Gets value associated with the given key.
+ *
+ * @param grpId Group ID. First part of the key.
+ * @param pageId Page ID. Second part of the key.
+ * @param reqVer Requested entry version, counter associated with value.
+ * @param absent Return if provided page is not presented in map.
+ * @param outdated Return if provided {@code reqVer} version is greater than value in map (was used for put).
+ * @return A value associated with the given key.
+ */
+ long get(int grpId, long pageId, int reqVer, long absent, long outdated);
+
+ /**
+ * Associates the given key with the given value.
+ *
+ * @param grpId Group ID. First part of the key.
+ * @param pageId Page ID. Second part of the key.
+ * @param val Value to set.
+ * @param ver Version/counter associated with value, can be used to check if value is outdated.
+ */
+ void put(int grpId, long pageId, long val, int ver);
+
+ /**
+ * Refresh outdated value. Sets provided version to value associated with group and page. Method should be called only for key present
+ * and only if version was outdated. Method may be called in case {@link #get(int, long, int, long, long)} returned {@code outdated}
+ * return value.
+ *
+ * @param grpId First part of the key. Group ID.
+ * @param pageId Second part of the key. Page ID.
+ * @param ver Partition tag.
+ * @return A value associated with the given key.
+ * @throws IllegalArgumentException if method is called for absent key or key with fresh version.
+ */
+ long refresh(int grpId, long pageId, int ver);
+
+ /**
+ * Removes key-value association for the given key.
+ *
+ * @param grpId First part of the key. Group ID.
+ * @param pageId Second part of the key. Page ID.
+ * @return {@code True} if value was actually found and removed.
+ */
+ boolean remove(int grpId, long pageId);
+
+ /**
+ * Returns maximum number of entries in the map. This maximum can not be always reached.
+ */
+ int capacity();
+
+ /**
+ * Returns current number of entries in the map.
+ */
+ int size();
+
+ /**
+ * Find the nearest presented value from specified position to the right.
+ *
+ * @param idxStart Index to start searching from. Bounded with {@link #capacity()}.
+ * @return Closest value to the index, and it's partition tag or {@code null} value that will be returned if no values present.
+ */
+ @Nullable ReplaceCandidate getNearestAt(int idxStart);
+
+ /**
+ * Removes entities matching provided predicate at specified mapping range.
+ *
+ * @param startIdxToClear Index to clear value at, inclusive. Bounded with {@link #capacity()}.
+ * @param endIdxToClear Index to clear value at, inclusive. Bounded with {@link #capacity()}.
+ * @param keyPred Test predicate for (group ID, page ID).
+ * @return List with removed values, value is not added to list for empty cell or if key is not matching to predicate.
+ */
+ LongArrayList removeIf(int startIdxToClear, int endIdxToClear, KeyPredicate keyPred);
+
+ /**
+ * Removes entities matching provided predicate.
+ *
+ * @param keyPred Test predicate for (group ID, page ID).
+ * @return List with removed values, value is not added to list for empty cell or if key is not matching to predicate.
+ */
+ default LongArrayList removeIf(KeyPredicate keyPred) {
+ return removeIf(0, capacity(), keyPred);
+ }
+
+ /**
+ * Scans all the elements in this table.
+ *
+ * @param act Visitor/action to be applied to each not empty cell.
+ */
+ void forEach(BiConsumer<FullPageId, Long> act);
+
+ /**
+ * Interface describing a predicate for Key (group ID, page ID). Usage of this predicate prevents odd object creation.
+ */
+ @FunctionalInterface
+ interface KeyPredicate {
+ /**
+ * Predicate body.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ */
+ boolean test(int grpId, long pageId);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
new file mode 100644
index 0000000..d776e21
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.RELATIVE_PTR_MASK;
+import static org.apache.ignite.internal.util.GridUnsafe.decrementAndGetInt;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.getIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.incrementAndGetInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putLongVolatile;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+
+/**
+ * Page header.
+ */
+class PageHeader {
+ /** Page marker. */
+ public static final long PAGE_MARKER = 0x0000000000000001L;
+
+ /** Dirty flag. */
+ private static final long DIRTY_FLAG = 0x0100000000000000L;
+
+ /** Page relative pointer. Does not change once a page is allocated. */
+ private static final int RELATIVE_PTR_OFFSET = 8;
+
+ /** Page ID offset. */
+ private static final int PAGE_ID_OFFSET = 16;
+
+ /** Page group ID offset. */
+ private static final int PAGE_GROUP_ID_OFFSET = 24;
+
+ /** Page pin counter offset. */
+ private static final int PAGE_PIN_CNT_OFFSET = 28;
+
+ /** Page temp copy buffer relative pointer offset. */
+ private static final int PAGE_TMP_BUF_OFFSET = 40;
+
+ /**
+ * Initializes the header of the page.
+ *
+ * @param absPtr Absolute pointer to initialize.
+ * @param relative Relative pointer to write.
+ */
+ public static void initNew(long absPtr, long relative) {
+ relative(absPtr, relative);
+
+ tempBufferPointer(absPtr, INVALID_REL_PTR);
+
+ putLong(absPtr, PAGE_MARKER);
+ putInt(absPtr + PAGE_PIN_CNT_OFFSET, 0);
+ }
+
+ /**
+ * Returns value of dirty flag.
+ *
+ * @param absPtr Absolute pointer.
+ */
+ public static boolean dirty(long absPtr) {
+ return flag(absPtr, DIRTY_FLAG);
+ }
+
+ /**
+ * Updates value of dirty flag.
+ *
+ * @param absPtr Page absolute pointer.
+ * @param dirty Dirty flag.
+ * @return Previous value of dirty flag.
+ */
+ public static boolean dirty(long absPtr, boolean dirty) {
+ return flag(absPtr, DIRTY_FLAG, dirty);
+ }
+
+ /**
+ * Returns flag value.
+ *
+ * @param absPtr Absolute pointer.
+ * @param flag Flag mask.
+ */
+ private static boolean flag(long absPtr, long flag) {
+ assert (flag & 0xFFFFFFFFFFFFFFL) == 0;
+ assert Long.bitCount(flag) == 1;
+
+ long relPtrWithFlags = getLong(absPtr + RELATIVE_PTR_OFFSET);
+
+ return (relPtrWithFlags & flag) != 0;
+ }
+
+ /**
+ * Sets flag value.
+ *
+ * @param absPtr Absolute pointer.
+ * @param flag Flag mask.
+ * @param set New flag value.
+ * @return Previous flag value.
+ */
+ private static boolean flag(long absPtr, long flag, boolean set) {
+ assert (flag & 0xFFFFFFFFFFFFFFL) == 0;
+ assert Long.bitCount(flag) == 1;
+
+ long relPtrWithFlags = getLong(absPtr + RELATIVE_PTR_OFFSET);
+
+ boolean was = (relPtrWithFlags & flag) != 0;
+
+ if (set) {
+ relPtrWithFlags |= flag;
+ } else {
+ relPtrWithFlags &= ~flag;
+ }
+
+ putLong(absPtr + RELATIVE_PTR_OFFSET, relPtrWithFlags);
+
+ return was;
+ }
+
+ /**
+ * Checks if page is pinned.
+ *
+ * @param absPtr Page pointer.
+ */
+ public static boolean isAcquired(long absPtr) {
+ return getInt(absPtr + PAGE_PIN_CNT_OFFSET) > 0;
+ }
+
+ /**
+ * Acquires a page.
+ *
+ * @param absPtr Absolute pointer.
+ * @return Number of acquires for the page.
+ */
+ public static int acquirePage(long absPtr) {
+ return incrementAndGetInt(absPtr + PAGE_PIN_CNT_OFFSET);
+ }
+
+ /**
+ * Releases the page.
+ *
+ * @param absPtr Absolute pointer.
+ * @return Number of acquires for the page.
+ */
+ public static int releasePage(long absPtr) {
+ return decrementAndGetInt(absPtr + PAGE_PIN_CNT_OFFSET);
+ }
+
+ /**
+ * Returns number of acquires for the page.
+ *
+ * @param absPtr Absolute pointer.
+ */
+ public static int pinCount(long absPtr) {
+ return getIntVolatile(null, absPtr);
+ }
+
+ /**
+ * Reads relative pointer from the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ */
+ public static long readRelative(long absPtr) {
+ return getLong(absPtr + RELATIVE_PTR_OFFSET) & RELATIVE_PTR_MASK;
+ }
+
+ /**
+ * Writes relative pointer to the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param relPtr Relative pointer to write.
+ */
+ public static void relative(long absPtr, long relPtr) {
+ putLong(absPtr + RELATIVE_PTR_OFFSET, relPtr & RELATIVE_PTR_MASK);
+ }
+
+ /**
+ * Volatile write for current timestamp to page in {@code absAddr} address.
+ *
+ * @param absPtr Absolute page address.
+ * @param tstamp Timestamp.
+ */
+ public static void writeTimestamp(final long absPtr, long tstamp) {
+ tstamp &= 0xFFFFFFFFFFFFFF00L;
+
+ putLongVolatile(null, absPtr, tstamp | 0x01);
+ }
+
+ /**
+ * Read for timestamp from page in {@code absAddr} address.
+ *
+ * @param absPtr Absolute page address.
+ * @return Timestamp.
+ */
+ public static long readTimestamp(final long absPtr) {
+ long markerAndTs = getLong(absPtr);
+
+ // Clear last byte as it is occupied by page marker.
+ return markerAndTs & ~0xFF;
+ }
+
+ /**
+ * Sets pointer to checkpoint buffer.
+ *
+ * @param absPtr Page absolute pointer.
+ * @param tmpRelPtr Temp buffer relative pointer or {@link PageMemoryImpl#INVALID_REL_PTR} if page is not copied to checkpoint buffer.
+ */
+ public static void tempBufferPointer(long absPtr, long tmpRelPtr) {
+ putLong(absPtr + PAGE_TMP_BUF_OFFSET, tmpRelPtr);
+ }
+
+ /**
+ * Gets pointer to checkpoint buffer or {@link PageMemoryImpl#INVALID_REL_PTR} if page is not copied to checkpoint buffer.
+ *
+ * @param absPtr Page absolute pointer.
+ * @return Temp buffer relative pointer.
+ */
+ public static long tempBufferPointer(long absPtr) {
+ return getLong(absPtr + PAGE_TMP_BUF_OFFSET);
+ }
+
+ /**
+ * Reads page ID from the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @return Page ID written to the page.
+ */
+ public static long readPageId(long absPtr) {
+ return getLong(absPtr + PAGE_ID_OFFSET);
+ }
+
+ /**
+ * Writes page ID to the page at the given absolute position.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param pageId Page ID to write.
+ */
+ private static void pageId(long absPtr, long pageId) {
+ putLong(absPtr + PAGE_ID_OFFSET, pageId);
+ }
+
+ /**
+ * Reads group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @return Group ID written to the page.
+ */
+ private static int readPageGroupId(final long absPtr) {
+ return getInt(absPtr + PAGE_GROUP_ID_OFFSET);
+ }
+
+ /**
+ * Writes group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param grpId Group ID to write.
+ */
+ private static void pageGroupId(final long absPtr, final int grpId) {
+ putInt(absPtr + PAGE_GROUP_ID_OFFSET, grpId);
+ }
+
+ /**
+ * Reads page ID and group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @return Full page ID written to the page.
+ */
+ public static FullPageId fullPageId(final long absPtr) {
+ return new FullPageId(readPageId(absPtr), readPageGroupId(absPtr));
+ }
+
+ /**
+ * Writes page ID and group ID from the page at the given absolute pointer.
+ *
+ * @param absPtr Absolute memory pointer to the page header.
+ * @param fullPageId Full page ID to write.
+ */
+ public static void fullPageId(final long absPtr, final FullPageId fullPageId) {
+ pageId(absPtr, fullPageId.pageId());
+
+ pageGroupId(absPtr, fullPageId.groupId());
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java
new file mode 100644
index 0000000..3297814
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Page memory with some persistence related additions.
+ */
+//TODO IGNITE-16350 Improve javadoc in this class.
+//TODO IGNITE-16350 Consider removing this interface.
+public interface PageMemoryEx extends PageMemory {
+ /**
+ * Acquires a read lock associated with the given page.
+ *
+ * @param absPtr Absolute pointer to read lock.
+ * @param pageId Page ID.
+ * @param force Force flag.
+ * @param touch Update page timestamp.
+ * @return Pointer to the page read buffer.
+ */
+ long readLock(long absPtr, long pageId, boolean force, boolean touch);
+
+ /**
+ * Acquired a write lock on the page.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param restore Determines if the page is locked for restore memory (crash recovery).
+ * @return Pointer to the page read buffer.
+ */
+ long writeLock(int grpId, long pageId, long page, boolean restore);
+
+ /**
+ * Releases locked page.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param dirtyFlag Determines whether the page was modified since the last checkpoint.
+ * @param restore Determines if the page is locked for restore.
+ */
+ void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore);
+
+ /**
+ * Gets partition metadata page ID for specified grpId and partId.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ * @return Meta page for grpId and partId.
+ */
+ long partitionMetaPageId(int grpId, int partId);
+
+ /**
+ * Returns an absolute pointer to a page, associated with the given page ID.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param pageAllocated Flag is set if new page was allocated in offheap memory.
+ * @return Page.
+ * @throws IgniteInternalCheckedException If failed.
+ * @see #acquirePage(int, long) Sets additional flag indicating that page was not found in memory and had to be allocated.
+ */
+ long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException;
+
+ /**
+ * Returns an absolute pointer to a page, associated with the given page ID.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page id.
+ * @param restore Get page for restore
+ * @return Page.
+ * @throws IgniteInternalCheckedException If failed.
+ * @throws StorageException If page reading failed from storage.
+ * @see #acquirePage(int, long) Will read page from file if it is not present in memory.
+ */
+ long acquirePage(int grpId, long pageId, IoStatisticsHolder statHldr, boolean restore) throws IgniteInternalCheckedException;
+
+ /**
+ * Marks partition as invalid / outdated.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ * @return New partition generation (growing 1-based partition file version).
+ */
+ int invalidate(int grpId, int partId);
+
+ /**
+ * Clears internal metadata of destroyed group.
+ *
+ * @param grpId Group ID.
+ */
+ void onCacheGroupDestroyed(int grpId);
+
+ /**
+ * Total pages can be placed to memory.
+ */
+ long totalPages();
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
new file mode 100644
index 0000000..d25c039
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
@@ -0,0 +1,1601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.lang.System.lineSeparator;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getCrc;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.tag;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.IgniteUtils.hash;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+------------------------------------------------------+
+ * |8 bytes | PAGE_SIZE + PAGE_OVERHEAD - 8 bytes |
+ * +--------+------------------------------------------------------+
+ * |Next ptr| Page data |
+ * +--------+------------------------------------------------------+
+ * </pre>
+ *
+ * <p>When page is allocated and is in use:
+ * <pre>
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | 8 bytes |8 bytes |8 bytes |4 b |4 b |8 bytes |8 bytes | PAGE_SIZE |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | Marker/Timestamp |Rel ptr |Page ID |C ID|PIN | LOCK |TMP BUF | Page data |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending on whether the page is
+ * in use or not.
+ */
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PageMemoryImpl implements PageMemoryEx {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
+
+ /** Full relative pointer mask. */
+ public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+ /** Invalid relative pointer value. */
+ static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+ /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
+ static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+
+ /** Page lock offset. */
+ public static final int PAGE_LOCK_OFFSET = 32;
+
+ /** 8b Marker/timestamp 8b Relative pointer 8b Page ID 4b Group ID 4b Pin count 8b Lock 8b Temporary buffer. */
+ public static final int PAGE_OVERHEAD = 48;
+
+ /** Try again tag. */
+ public static final int TRY_AGAIN_TAG = -1;
+
+ /** Data region configuration view. */
+ private final PageMemoryDataRegionView dataRegionCfg;
+
+ /** Page IO registry. */
+ private final PageIoRegistry ioRegistry;
+
+ /** Page manager. */
+ private final PageReadWriteManager pmPageMgr;
+
+ /** Page size. */
+ private final int sysPageSize;
+
+ /** Page replacement policy factory. */
+ private final PageReplacementPolicyFactory pageReplacementPolicyFactory;
+
+ /** Direct memory allocator. */
+ private final DirectMemoryProvider directMemoryProvider;
+
+ /** Segments array. */
+ private volatile Segment[] segments;
+
+ /** Lock for segments changes. */
+ private final Object segmentsLock = new Object();
+
+ /** Offheap read write lock instance. */
+ private final OffheapReadWriteLock rwLock;
+
+ /** Callback invoked to track changes in pages. {@code Null} if page tracking functionality is disabled. */
+ @Nullable
+ private final PageChangeTracker changeTracker;
+
+ /** Field updater. */
+ private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
+ /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
+ private volatile int pageReplacementWarned;
+
+ /** Segments sizes. */
+ private final long[] sizes;
+
+ /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
+ private volatile boolean started;
+
+ /**
+ * Constructor.
+ *
+ * @param directMemoryProvider Memory allocator to use.
+ * @param dataRegionCfg Data region configuration.
+ * @param ioRegistry IO registry.
+ * @param sizes Segments sizes.
+ * @param pmPageMgr Page store manager.
+ * @param changeTracker Callback invoked to track changes in pages.
+ */
+ public PageMemoryImpl(
+ DirectMemoryProvider directMemoryProvider,
+ PageMemoryDataRegionConfiguration dataRegionCfg,
+ PageIoRegistry ioRegistry,
+ long[] sizes,
+ PageReadWriteManager pmPageMgr,
+ @Nullable PageChangeTracker changeTracker
+ ) {
+ this.directMemoryProvider = directMemoryProvider;
+ this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+ this.ioRegistry = ioRegistry;
+ this.sizes = sizes;
+ this.pmPageMgr = pmPageMgr;
+ this.changeTracker = changeTracker;
+
+ int pageSize = this.dataRegionCfg.pageSize();
+
+ sysPageSize = pageSize + PAGE_OVERHEAD;
+
+ rwLock = new OffheapReadWriteLock(128);
+
+ String replacementMode = this.dataRegionCfg.replacementMode();
+
+ switch (replacementMode) {
+ case RANDOM_LRU_REPLACEMENT_MODE:
+ pageReplacementPolicyFactory = new RandomLruPageReplacementPolicyFactory();
+
+ break;
+ case SEGMENTED_LRU_REPLACEMENT_MODE:
+ pageReplacementPolicyFactory = new SegmentedLruPageReplacementPolicyFactory();
+
+ break;
+ case CLOCK_REPLACEMENT_MODE:
+ pageReplacementPolicyFactory = new ClockPageReplacementPolicyFactory();
+
+ break;
+ default:
+ throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() throws IgniteInternalException {
+ synchronized (segmentsLock) {
+ if (started) {
+ return;
+ }
+
+ started = true;
+
+ directMemoryProvider.initialize(sizes);
+
+ List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
+
+ while (true) {
+ DirectMemoryRegion reg = directMemoryProvider.nextRegion();
+
+ if (reg == null) {
+ break;
+ }
+
+ regions.add(reg);
+ }
+
+ int regs = regions.size();
+
+ Segment[] segments = new Segment[regs - 1];
+
+ DirectMemoryRegion cpReg = regions.get(regs - 1);
+
+ long checkpointBuf = cpReg.size();
+
+ long totalAllocated = 0;
+ int pages = 0;
+ long totalTblSize = 0;
+ long totalReplSize = 0;
+
+ for (int i = 0; i < regs - 1; i++) {
+ assert i < segments.length;
+
+ DirectMemoryRegion reg = regions.get(i);
+
+ totalAllocated += reg.size();
+
+ segments[i] = new Segment(i, regions.get(i));
+
+ pages += segments[i].pages();
+ totalTblSize += segments[i].tableSize();
+ totalReplSize += segments[i].replacementSize();
+ }
+
+ this.segments = segments;
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Started page memory [memoryAllocated=" + readableSize(totalAllocated, false)
+ + ", pages=" + pages
+ + ", tableSize=" + readableSize(totalTblSize, false)
+ + ", replacementSize=" + readableSize(totalReplSize, false)
+ + ", checkpointBuffer=" + readableSize(checkpointBuf, false)
+ + ']');
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop(boolean deallocate) throws IgniteInternalException {
+ synchronized (segmentsLock) {
+ if (!started) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping page memory.");
+ }
+
+ if (segments != null) {
+ for (Segment seg : segments) {
+ seg.close();
+ }
+ }
+
+ started = false;
+
+ directMemoryProvider.shutdown(deallocate);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void releasePage(int grpId, long pageId, long page) {
+ assert started;
+
+ Segment seg = segment(grpId, pageId);
+
+ seg.readLock().lock();
+
+ try {
+ seg.releasePage(page);
+ } finally {
+ seg.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long readLock(int grpId, long pageId, long page) {
+ assert started;
+
+ return readLock(page, pageId, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+ assert started;
+
+ int tag = force ? -1 : tag(pageId);
+
+ boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+ if (!locked) {
+ return 0;
+ }
+
+ if (touch) {
+ writeTimestamp(absPtr, coarseCurrentTimeMillis());
+ }
+
+ assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+ return absPtr + PAGE_OVERHEAD;
+ }
+
+ private long readLock(long absPtr, long pageId, boolean force) {
+ return readLock(absPtr, pageId, force, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void readUnlock(int grpId, long pageId, long page) {
+ assert started;
+
+ readUnlockPage(page);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long writeLock(int grpId, long pageId, long page) {
+ assert started;
+
+ return writeLock(grpId, pageId, page, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long writeLock(int grpId, long pageId, long page, boolean restore) {
+ assert started;
+
+ return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long tryWriteLock(int grpId, long pageId, long page) {
+ assert started;
+
+ return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag) {
+ assert started;
+
+ writeUnlock(grpId, pageId, page, dirtyFlag, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
+ assert started;
+
+ writeUnlockPage(page, new FullPageId(pageId, grpId), dirtyFlag, restore);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isDirty(int grpId, long pageId, long page) {
+ assert started;
+
+ return isDirty(page);
+ }
+
+ /**
+ * Returns {@code true} if page is dirty.
+ *
+ * @param absPtr Absolute pointer.
+ */
+ boolean isDirty(long absPtr) {
+ return dirty(absPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+ assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
+
+ assert started;
+
+ long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
+
+ assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+
+ // We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
+ // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+ // because there is no crc inside them.
+ Segment seg = segment(grpId, pageId);
+
+ seg.writeLock().lock();
+
+ FullPageId fullId = new FullPageId(pageId, grpId);
+
+ try {
+ long relPtr = seg.loadedPages.get(
+ grpId,
+ effectivePageId(pageId),
+ seg.partGeneration(grpId, partId),
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+
+ if (relPtr == OUTDATED_REL_PTR) {
+ relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+ seg.pageReplacementPolicy.onRemove(relPtr);
+ }
+
+ if (relPtr == INVALID_REL_PTR) {
+ relPtr = seg.borrowOrAllocateFreePage(pageId);
+ }
+
+ if (relPtr == INVALID_REL_PTR) {
+ relPtr = seg.removePageForReplacement();
+ }
+
+ long absPtr = seg.absolute(relPtr);
+
+ setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+ fullPageId(absPtr, fullId);
+ writeTimestamp(absPtr, coarseCurrentTimeMillis());
+ rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+ assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+ assert !isAcquired(absPtr) :
+ "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr)
+ + ", absPtr=" + hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
+
+ setDirty(fullId, absPtr, true, true);
+
+ seg.pageReplacementPolicy.onMiss(relPtr);
+
+ seg.loadedPages.put(grpId, effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId));
+ } catch (IgniteOutOfMemoryException oom) {
+ IgniteOutOfMemoryException e = new IgniteOutOfMemoryException("Out of memory in data region ["
+ + "name=" + dataRegionCfg.name()
+ + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+ + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+ + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+ + " ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+ + " ^-- Enable eviction or expiration policies"
+ );
+
+ e.initCause(oom);
+
+ throw e;
+ } finally {
+ seg.writeLock().unlock();
+ }
+
+ return pageId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ByteBuffer pageBuffer(long pageAddr) {
+ return wrapPointer(pageAddr, pageSize());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean freePage(int grpId, long pageId) {
+ assert false : "Free page should be never called directly when persistence is enabled.";
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long partitionMetaPageId(int grpId, int partId) {
+ assert started;
+
+ //TODO IGNITE-16350 Consider reworking in FLAG_AUX.
+ return pageId(partId, FLAG_DATA, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long acquirePage(int grpId, long pageId) throws IgniteInternalCheckedException {
+ return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+ assert started;
+
+ return acquirePage(grpId, pageId, statHolder, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException {
+ return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false, pageAllocated);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder, boolean restore) throws IgniteInternalCheckedException {
+ return acquirePage(grpId, pageId, statHolder, restore, null);
+ }
+
+ private long acquirePage(
+ int grpId,
+ long pageId,
+ IoStatisticsHolder statHolder,
+ boolean restore,
+ @Nullable AtomicBoolean pageAllocated
+ ) throws IgniteInternalCheckedException {
+ assert started;
+
+ int partId = partitionId(pageId);
+
+ Segment seg = segment(grpId, pageId);
+
+ seg.readLock().lock();
+
+ try {
+ long relPtr = seg.loadedPages.get(
+ grpId,
+ effectivePageId(pageId),
+ seg.partGeneration(grpId, partId),
+ INVALID_REL_PTR,
+ INVALID_REL_PTR
+ );
+
+ // The page is loaded to the memory.
+ if (relPtr != INVALID_REL_PTR) {
+ long absPtr = seg.absolute(relPtr);
+
+ seg.acquirePage(absPtr);
+
+ seg.pageReplacementPolicy.onHit(relPtr);
+
+ statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+
+ return absPtr;
+ }
+ } finally {
+ seg.readLock().unlock();
+ }
+
+ FullPageId fullId = new FullPageId(pageId, grpId);
+
+ seg.writeLock().lock();
+
+ long lockedPageAbsPtr = -1;
+ boolean readPageFromStore = false;
+
+ try {
+ // Double-check.
+ long relPtr = seg.loadedPages.get(
+ grpId,
+ fullId.effectivePageId(),
+ seg.partGeneration(grpId, partId),
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+
+ long absPtr;
+
+ if (relPtr == INVALID_REL_PTR) {
+ relPtr = seg.borrowOrAllocateFreePage(pageId);
+
+ if (pageAllocated != null) {
+ pageAllocated.set(true);
+ }
+
+ if (relPtr == INVALID_REL_PTR) {
+ relPtr = seg.removePageForReplacement();
+ }
+
+ absPtr = seg.absolute(relPtr);
+
+ fullPageId(absPtr, fullId);
+ writeTimestamp(absPtr, coarseCurrentTimeMillis());
+
+ assert !isAcquired(absPtr) :
+ "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+ // We can clear dirty flag after the page has been allocated.
+ setDirty(fullId, absPtr, false, false);
+
+ seg.pageReplacementPolicy.onMiss(relPtr);
+
+ seg.loadedPages.put(
+ grpId,
+ fullId.effectivePageId(),
+ relPtr,
+ seg.partGeneration(grpId, partId)
+ );
+
+ long pageAddr = absPtr + PAGE_OVERHEAD;
+
+ if (!restore) {
+ readPageFromStore = true;
+ } else {
+ setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+ // Must init page ID in order to ensure RWLock tag consistency.
+ setPageId(pageAddr, pageId);
+ }
+
+ rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+ if (readPageFromStore) {
+ boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+ assert locked : "Page ID " + fullId + " expected to be locked";
+
+ lockedPageAbsPtr = absPtr;
+ }
+ } else if (relPtr == OUTDATED_REL_PTR) {
+ assert pageIndex(pageId) == 0 : fullId;
+
+ relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+ absPtr = seg.absolute(relPtr);
+
+ long pageAddr = absPtr + PAGE_OVERHEAD;
+
+ setMemory(pageAddr, pageSize(), (byte) 0);
+
+ fullPageId(absPtr, fullId);
+ writeTimestamp(absPtr, coarseCurrentTimeMillis());
+ setPageId(pageAddr, pageId);
+
+ assert !isAcquired(absPtr) :
+ "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+ rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+ seg.pageReplacementPolicy.onRemove(relPtr);
+ seg.pageReplacementPolicy.onMiss(relPtr);
+ } else {
+ absPtr = seg.absolute(relPtr);
+
+ seg.pageReplacementPolicy.onHit(relPtr);
+ }
+
+ seg.acquirePage(absPtr);
+
+ if (!readPageFromStore) {
+ statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+ }
+
+ return absPtr;
+ } finally {
+ seg.writeLock().unlock();
+
+ if (readPageFromStore) {
+ assert lockedPageAbsPtr != -1 : "Page is expected to have a valid address [pageId=" + fullId
+ + ", lockedPageAbsPtr=" + hexLong(lockedPageAbsPtr) + ']';
+
+ assert isPageWriteLocked(lockedPageAbsPtr) : "Page is expected to be locked: [pageId=" + fullId + "]";
+
+ long pageAddr = lockedPageAbsPtr + PAGE_OVERHEAD;
+
+ ByteBuffer buf = wrapPointer(pageAddr, pageSize());
+
+ long actualPageId = 0;
+
+ try {
+ pmPageMgr.read(grpId, pageId, buf, false);
+
+ statHolder.trackPhysicalAndLogicalRead(pageAddr);
+
+ actualPageId = getPageId(buf);
+ } finally {
+ rwLock.writeUnlock(lockedPageAbsPtr + PAGE_LOCK_OFFSET, actualPageId == 0 ? TAG_LOCK_ALWAYS : tag(actualPageId));
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int pageSize() {
+ return sysPageSize - PAGE_OVERHEAD;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int systemPageSize() {
+ return sysPageSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int realPageSize(int grpId) {
+ return pageSize();
+ }
+
+ /**
+ * Returns Max dirty ratio from the segments.
+ */
+ double getDirtyPagesRatio() {
+ if (segments == null) {
+ return 0;
+ }
+
+ double res = 0;
+
+ for (Segment segment : segments) {
+ res = Math.max(res, segment.getDirtyPagesRatio());
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns total pages can be placed in all segments.
+ */
+ @Override
+ public long totalPages() {
+ if (segments == null) {
+ return 0;
+ }
+
+ long res = 0;
+
+ for (Segment segment : segments) {
+ res += segment.pages();
+ }
+
+ return res;
+ }
+
+ private void copyInBuffer(long absPtr, ByteBuffer tmpBuf) {
+ if (tmpBuf.isDirect()) {
+ long tmpPtr = bufferAddress(tmpBuf);
+
+ copyMemory(absPtr + PAGE_OVERHEAD, tmpPtr, pageSize());
+
+ assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+ assert getCrc(tmpPtr) == 0; //TODO IGNITE-16612
+ } else {
+ byte[] arr = tmpBuf.array();
+
+ assert arr.length == pageSize();
+
+ copyMemory(null, absPtr + PAGE_OVERHEAD, arr, BYTE_ARR_OFF, pageSize());
+ }
+ }
+
+ /**
+ * Get current prartition generation tag.
+ *
+ * @param seg Segment.
+ * @param fullId Full page id.
+ * @return Current partition generation tag.
+ */
+ private int generationTag(Segment seg, FullPageId fullId) {
+ return seg.partGeneration(
+ fullId.groupId(),
+ partitionId(fullId.pageId())
+ );
+ }
+
+ /**
+ * Resolver relative pointer via {@link LoadedPagesMap}.
+ *
+ * @param seg Segment.
+ * @param fullId Full page id.
+ * @param reqVer Required version.
+ * @return Relative pointer.
+ */
+ private long resolveRelativePointer(Segment seg, FullPageId fullId, int reqVer) {
+ return seg.loadedPages.get(
+ fullId.groupId(),
+ fullId.effectivePageId(),
+ reqVer,
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int invalidate(int grpId, int partId) {
+ synchronized (segmentsLock) {
+ if (!started) {
+ return 0;
+ }
+
+ int tag = 0;
+
+ for (Segment seg : segments) {
+ seg.writeLock().lock();
+
+ try {
+ int newTag = seg.incrementPartGeneration(grpId, partId);
+
+ if (tag == 0) {
+ tag = newTag;
+ }
+
+ assert tag == newTag;
+ } finally {
+ seg.writeLock().unlock();
+ }
+ }
+
+ return tag;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onCacheGroupDestroyed(int grpId) {
+ for (Segment seg : segments) {
+ seg.writeLock().lock();
+
+ try {
+ seg.resetGroupPartitionsGeneration(grpId);
+ } finally {
+ seg.writeLock().unlock();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long loadedPages() {
+ long total = 0;
+
+ Segment[] segments = this.segments;
+
+ if (segments != null) {
+ for (Segment seg : segments) {
+ if (seg == null) {
+ break;
+ }
+
+ seg.readLock().lock();
+
+ try {
+ if (seg.closed) {
+ continue;
+ }
+
+ total += seg.loadedPages.size();
+ } finally {
+ seg.readLock().unlock();
+ }
+ }
+ }
+
+ return total;
+ }
+
+ /**
+ * Returns total number of acquired pages.
+ */
+ public long acquiredPages() {
+ if (segments == null) {
+ return 0L;
+ }
+
+ long total = 0;
+
+ for (Segment seg : segments) {
+ seg.readLock().lock();
+
+ try {
+ if (seg.closed) {
+ continue;
+ }
+
+ total += seg.acquiredPages();
+ } finally {
+ seg.readLock().unlock();
+ }
+ }
+
+ return total;
+ }
+
+ /**
+ * Returns {@code true} if the page is contained in the loaded pages table, {@code false} otherwise.
+ *
+ * @param fullPageId Full page ID to check.
+ */
+ public boolean hasLoadedPage(FullPageId fullPageId) {
+ int grpId = fullPageId.groupId();
+ long pageId = fullPageId.effectivePageId();
+ int partId = partitionId(pageId);
+
+ Segment seg = segment(grpId, pageId);
+
+ seg.readLock().lock();
+
+ try {
+ long res = seg.loadedPages.get(grpId, pageId, seg.partGeneration(grpId, partId), INVALID_REL_PTR, INVALID_REL_PTR);
+
+ return res != INVALID_REL_PTR;
+ } finally {
+ seg.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long readLockForce(int grpId, long pageId, long page) {
+ assert started;
+
+ return readLock(page, pageId, true);
+ }
+
+ /**
+ * Releases read lock.
+ *
+ * @param absPtr Absolute pointer to unlock.
+ */
+ void readUnlockPage(long absPtr) {
+ rwLock.readUnlock(absPtr + PAGE_LOCK_OFFSET);
+ }
+
+ /**
+ * Checks if a page has temp copy buffer.
+ *
+ * @param absPtr Absolute pointer.
+ * @return {@code True} if a page has temp buffer.
+ */
+ public boolean hasTempCopy(long absPtr) {
+ return tempBufferPointer(absPtr) != INVALID_REL_PTR;
+ }
+
+ /**
+ * Tries to acquire a write lock.
+ *
+ * @param absPtr Absolute pointer.
+ * @return Pointer to the page write buffer or {@code 0} if page was not locked.
+ */
+ private long tryWriteLockPage(long absPtr, FullPageId fullId, boolean checkTag) {
+ int tag = checkTag ? tag(fullId.pageId()) : TAG_LOCK_ALWAYS;
+
+ return !rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, tag) ? 0 : postWriteLockPage(absPtr, fullId);
+ }
+
+ /**
+ * Acquires a write lock.
+ *
+ * @param absPtr Absolute pointer.
+ * @return Pointer to the page write buffer or {@code 0} if page was not locked.
+ */
+ private long writeLockPage(long absPtr, FullPageId fullId, boolean checkTag) {
+ int tag = checkTag ? tag(fullId.pageId()) : TAG_LOCK_ALWAYS;
+
+ boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+ return locked ? postWriteLockPage(absPtr, fullId) : 0;
+ }
+
+ private long postWriteLockPage(long absPtr, FullPageId fullId) {
+ writeTimestamp(absPtr, coarseCurrentTimeMillis());
+
+ assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+ return absPtr + PAGE_OVERHEAD;
+ }
+
+ private void writeUnlockPage(
+ long page,
+ FullPageId fullId,
+ boolean markDirty,
+ boolean restore
+ ) {
+ boolean wasDirty = isDirty(page);
+
+ try {
+ // If page is for restore, we shouldn't mark it as changed.
+ if (!restore && markDirty && !wasDirty && changeTracker != null) {
+ changeTracker.apply(page, fullId, this);
+ }
+
+ assert getCrc(page + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+ if (markDirty) {
+ setDirty(fullId, page, true, false);
+ }
+ } finally { // Always release the lock.
+ long pageId = getPageId(page + PAGE_OVERHEAD);
+
+ try {
+ assert pageId != 0 : hexLong(PageHeader.readPageId(page));
+
+ rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, tag(pageId));
+
+ assert getVersion(page + PAGE_OVERHEAD) != 0 : dumpPage(pageId, fullId.groupId());
+ assert getType(page + PAGE_OVERHEAD) != 0 : hexLong(pageId);
+ } catch (AssertionError ex) {
+ LOG.error("Failed to unlock page [fullPageId=" + fullId + ", binPage=" + toHexString(page, systemPageSize()) + ']');
+
+ throw ex;
+ }
+ }
+ }
+
+ /**
+ * Prepares page details for assertion.
+ *
+ * @param pageId Page id.
+ * @param grpId Group id.
+ */
+ private String dumpPage(long pageId, int grpId) {
+ int pageIdx = pageIndex(pageId);
+ int partId = partitionId(pageId);
+ long off = (long) (pageIdx + 1) * pageSize();
+
+ return hexLong(pageId) + " (grpId=" + grpId + ", pageIdx=" + pageIdx + ", partId=" + partId + ", offH="
+ + Long.toHexString(off) + ")";
+ }
+
+ /**
+ * Returns {@code true} if write lock acquired for the page.
+ *
+ * @param absPtr Absolute pointer to the page.
+ */
+ boolean isPageWriteLocked(long absPtr) {
+ return rwLock.isWriteLocked(absPtr + PAGE_LOCK_OFFSET);
+ }
+
+ /**
+ * Returns {@code true} if read lock acquired for the page.
+ *
+ * @param absPtr Absolute pointer to the page.
+ */
+ boolean isPageReadLocked(long absPtr) {
+ return rwLock.isReadLocked(absPtr + PAGE_LOCK_OFFSET);
+ }
+
+ /**
+ * Returns the number of active pages across all segments. Used for test purposes only.
+ */
+ public int activePagesCount() {
+ if (segments == null) {
+ return 0;
+ }
+
+ int total = 0;
+
+ for (Segment seg : segments) {
+ total += seg.acquiredPages();
+ }
+
+ return total;
+ }
+
+ /**
+ * This method must be called in synchronized context.
+ *
+ * @param pageId full page ID.
+ * @param absPtr Absolute pointer.
+ * @param dirty {@code True} dirty flag.
+ * @param forceAdd If this flag is {@code true}, then the page will be added to the dirty set regardless whether the old flag was dirty
+ * or not.
+ */
+ private void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean forceAdd) {
+ boolean wasDirty = dirty(absPtr, dirty);
+
+ if (dirty) {
+ if (!wasDirty || forceAdd) {
+ Segment seg = segment(pageId.groupId(), pageId.pageId());
+
+ if (seg.dirtyPages.add(pageId)) {
+ seg.dirtyPagesCntr.incrementAndGet();
+ }
+ }
+ } else {
+ Segment seg = segment(pageId.groupId(), pageId.pageId());
+
+ if (seg.dirtyPages.remove(pageId)) {
+ seg.dirtyPagesCntr.decrementAndGet();
+ }
+ }
+ }
+
+ private Segment segment(int grpId, long pageId) {
+ int idx = segmentIndex(grpId, pageId, segments.length);
+
+ return segments[idx];
+ }
+
+ private static int segmentIndex(int grpId, long pageId, int segments) {
+ pageId = effectivePageId(pageId);
+
+ // Take a prime number larger than total number of partitions.
+ int hash = hash(pageId * 65537 + grpId);
+
+ return safeAbs(hash) % segments;
+ }
+
+ /**
+ * Returns a collection of all pages currently marked as dirty. Will create a collection copy.
+ */
+ @TestOnly
+ public Collection<FullPageId> dirtyPages() {
+ if (segments == null) {
+ return Collections.emptySet();
+ }
+
+ Collection<FullPageId> res = new HashSet<>((int) loadedPages());
+
+ for (Segment seg : segments) {
+ res.addAll(seg.dirtyPages);
+ }
+
+ return res;
+ }
+
+ /**
+ * Page segment.
+ */
+ class Segment extends ReentrantReadWriteLock {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Pointer to acquired pages integer counter. */
+ private static final int ACQUIRED_PAGES_SIZEOF = 4;
+
+ /** Padding to read from word beginning. */
+ private static final int ACQUIRED_PAGES_PADDING = 4;
+
+ /** Page ID to relative pointer map. */
+ private final LoadedPagesMap loadedPages;
+
+ /** Pointer to acquired pages integer counter. */
+ private final long acquiredPagesPtr;
+
+ /** Page pool. */
+ private final PagePool pool;
+
+ /** Page replacement policy. */
+ private final PageReplacementPolicy pageReplacementPolicy;
+
+ /** Bytes required to store {@link #loadedPages}. */
+ private final long memPerTbl;
+
+ /** Bytes required to store {@link #pageReplacementPolicy} service data. */
+ private long memPerRepl;
+
+ /** Pages marked as dirty since the last checkpoint. */
+ private volatile Collection<FullPageId> dirtyPages = ConcurrentHashMap.newKeySet();
+
+ /** Atomic size counter for {@link #dirtyPages}. */
+ private final AtomicLong dirtyPagesCntr = new AtomicLong();
+
+ /** Maximum number of dirty pages. */
+ private final long maxDirtyPages;
+
+ /** Initial partition generation. */
+ private static final int INIT_PART_GENERATION = 1;
+
+ /** Maps partition (grpId, partId) to its generation. Generation is 1-based incrementing partition counter. */
+ private final Map<GroupPartitionId, Integer> partGenerationMap = new HashMap<>();
+
+ /** Segment closed flag. */
+ private boolean closed;
+
+ /**
+ * Constructor.
+ *
+ * @param idx Segment index.
+ * @param region Memory region.
+ */
+ private Segment(int idx, DirectMemoryRegion region) {
+ long totalMemory = region.size();
+
+ int pages = (int) (totalMemory / sysPageSize);
+
+ acquiredPagesPtr = region.address();
+
+ putIntVolatile(null, acquiredPagesPtr, 0);
+
+ int ldPagesMapOffInRegion = ACQUIRED_PAGES_SIZEOF + ACQUIRED_PAGES_PADDING;
+
+ long ldPagesAddr = region.address() + ldPagesMapOffInRegion;
+
+ memPerTbl = RobinHoodBackwardShiftHashMap.requiredMemory(pages);
+
+ loadedPages = new RobinHoodBackwardShiftHashMap(ldPagesAddr, memPerTbl);
+
+ pages = (int) ((totalMemory - memPerTbl - ldPagesMapOffInRegion) / sysPageSize);
+
+ memPerRepl = pageReplacementPolicyFactory.requiredMemory(pages);
+
+ DirectMemoryRegion poolRegion = region.slice(memPerTbl + memPerRepl + ldPagesMapOffInRegion);
+
+ pool = new PagePool(idx, poolRegion, sysPageSize, rwLock);
+
+ pageReplacementPolicy = pageReplacementPolicyFactory.create(
+ this,
+ region.address() + memPerTbl + ldPagesMapOffInRegion,
+ pool.pages()
+ );
+
+ maxDirtyPages = pool.pages() * 3L / 4;
+ }
+
+ /**
+ * Closes the segment.
+ */
+ private void close() {
+ writeLock().lock();
+
+ try {
+ closed = true;
+ } finally {
+ writeLock().unlock();
+ }
+ }
+
+ /**
+ * Returns dirtyRatio to be compared with Throttle threshold.
+ */
+ private double getDirtyPagesRatio() {
+ return dirtyPagesCntr.doubleValue() / pages();
+ }
+
+ /**
+ * Returns max number of pages this segment can allocate.
+ */
+ private int pages() {
+ return pool.pages();
+ }
+
+ /**
+ * Returns memory allocated for pages table.
+ */
+ private long tableSize() {
+ return memPerTbl;
+ }
+
+ /**
+ * Returns memory allocated for page replacement service data.
+ */
+ private long replacementSize() {
+ return memPerRepl;
+ }
+
+ private void acquirePage(long absPtr) {
+ PageHeader.acquirePage(absPtr);
+
+ updateAtomicInt(acquiredPagesPtr, 1);
+ }
+
+ private void releasePage(long absPtr) {
+ PageHeader.releasePage(absPtr);
+
+ updateAtomicInt(acquiredPagesPtr, -1);
+ }
+
+ /**
+ * Returns total number of acquired pages.
+ */
+ private int acquiredPages() {
+ return getInt(acquiredPagesPtr);
+ }
+
+ /**
+ * Allocates a new free page.
+ *
+ * @param pageId Page ID.
+ * @return Page relative pointer.
+ */
+ private long borrowOrAllocateFreePage(long pageId) {
+ return pool.borrowOrAllocateFreePage(tag(pageId));
+ }
+
+ /**
+ * Clear dirty pages collection and reset counter.
+ */
+ private void resetDirtyPages() {
+ dirtyPages = ConcurrentHashMap.newKeySet();
+
+ dirtyPagesCntr.set(0);
+ }
+
+ /**
+ * Prepares a page removal for page replacement, if needed.
+ *
+ * @param fullPageId Candidate page full ID.
+ * @param absPtr Absolute pointer of the page to evict.
+ * @return {@code True} if it is ok to replace this page, {@code false} if another page should be selected.
+ * @throws IgniteInternalCheckedException If failed to write page to the underlying store during eviction.
+ */
+ public boolean tryToRemovePage(FullPageId fullPageId, long absPtr) throws IgniteInternalCheckedException {
+ assert writeLock().isHeldByCurrentThread();
+
+ // Do not evict group meta pages.
+ if (fullPageId.pageId() == META_PAGE_ID) {
+ return false;
+ }
+
+ if (isAcquired(absPtr)) {
+ return false;
+ }
+
+ if (isDirty(absPtr)) {
+ return false;
+ }
+
+ loadedPages.remove(fullPageId.groupId(), fullPageId.effectivePageId());
+
+ return true;
+ }
+
+ /**
+ * Refresh outdated value.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ * @param rmv {@code True} if page should be removed.
+ * @return Relative pointer to refreshed page.
+ */
+ public long refreshOutdatedPage(int grpId, long pageId, boolean rmv) {
+ assert writeLock().isHeldByCurrentThread();
+
+ int tag = partGeneration(grpId, partitionId(pageId));
+
+ long relPtr = loadedPages.refresh(grpId, effectivePageId(pageId), tag);
+
+ long absPtr = absolute(relPtr);
+
+ setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+ dirty(absPtr, false);
+
+ if (rmv) {
+ loadedPages.remove(grpId, effectivePageId(pageId));
+ }
+
+ Collection<FullPageId> dirtyPages = this.dirtyPages;
+
+ if (dirtyPages != null) {
+ if (dirtyPages.remove(new FullPageId(pageId, grpId))) {
+ dirtyPagesCntr.decrementAndGet();
+ }
+ }
+
+ return relPtr;
+ }
+
+ /**
+ * Removes random oldest page for page replacement from memory to storage.
+ *
+ * @return Relative address for removed page, now it can be replaced by allocated or reloaded page.
+ * @throws IgniteInternalCheckedException If failed to evict page.
+ */
+ private long removePageForReplacement() throws IgniteInternalCheckedException {
+ assert getWriteHoldCount() > 0;
+
+ if (pageReplacementWarned == 0) {
+ if (pageReplacementWarnedFieldUpdater.compareAndSet(PageMemoryImpl.this, 0, 1)) {
+ String msg = "Page replacements started, pages will be rotated with disk, this will affect "
+ + "storage performance (consider increasing PageMemoryDataRegionConfiguration#setMaxSize for "
+ + "data region): " + dataRegionCfg.name();
+
+ LOG.warn(msg);
+ }
+ }
+
+ if (acquiredPages() >= loadedPages.size()) {
+ throw oomException("all pages are acquired");
+ }
+
+ return pageReplacementPolicy.replace();
+ }
+
+ /**
+ * Creates out of memory exception with additional information.
+ *
+ * @param reason Reason.
+ */
+ public IgniteOutOfMemoryException oomException(String reason) {
+ return new IgniteOutOfMemoryException("Failed to find a page for eviction (" + reason + ") ["
+ + "segmentCapacity=" + loadedPages.capacity()
+ + ", loaded=" + loadedPages.size()
+ + ", maxDirtyPages=" + maxDirtyPages
+ + ", dirtyPages=" + dirtyPagesCntr
+ + ", pinned=" + acquiredPages()
+ + ']' + lineSeparator() + "Out of memory in data region ["
+ + "name=" + dataRegionCfg.name()
+ + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+ + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+ + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+ + " ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+ + " ^-- Enable eviction or expiration policies"
+ );
+ }
+
+ /**
+ * Delegate to the corresponding page pool.
+ *
+ * @param relPtr Relative pointer.
+ * @return Absolute pointer.
+ */
+ public long absolute(long relPtr) {
+ return pool.absolute(relPtr);
+ }
+
+ /**
+ * Delegate to the corresponding page pool.
+ *
+ * @param pageIdx Page index.
+ * @return Relative pointer.
+ */
+ public long relative(long pageIdx) {
+ return pool.relative(pageIdx);
+ }
+
+ /**
+ * Delegate to the corresponding page pool.
+ *
+ * @param relPtr Relative pointer.
+ * @return Page index in the pool.
+ */
+ public long pageIndex(long relPtr) {
+ return pool.pageIndex(relPtr);
+ }
+
+ /**
+ * Returns partition generation. Growing, 1-based partition version.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ public int partGeneration(int grpId, int partId) {
+ assert getReadHoldCount() > 0 || getWriteHoldCount() > 0;
+
+ Integer tag = partGenerationMap.get(new GroupPartitionId(grpId, partId));
+
+ assert tag == null || tag >= 0 : "Negative tag=" + tag;
+
+ return tag == null ? INIT_PART_GENERATION : tag;
+ }
+
+ /**
+ * Gets loaded pages map.
+ */
+ public LoadedPagesMap loadedPages() {
+ return loadedPages;
+ }
+
+ /**
+ * Gets page pool.
+ */
+ public PagePool pool() {
+ return pool;
+ }
+
+ /**
+ * Increments partition generation due to partition invalidation (e.g. partition was rebalanced to other node and evicted).
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ * @return New partition generation.
+ */
+ private int incrementPartGeneration(int grpId, int partId) {
+ assert getWriteHoldCount() > 0;
+
+ GroupPartitionId grpPart = new GroupPartitionId(grpId, partId);
+
+ Integer gen = partGenerationMap.get(grpPart);
+
+ if (gen == null) {
+ gen = INIT_PART_GENERATION;
+ }
+
+ if (gen == Integer.MAX_VALUE) {
+ LOG.warn("Partition tag overflow [grpId=" + grpId + ", partId=" + partId + "]");
+
+ partGenerationMap.put(grpPart, 0);
+
+ return 0;
+ } else {
+ partGenerationMap.put(grpPart, gen + 1);
+
+ return gen + 1;
+ }
+ }
+
+ private void resetGroupPartitionsGeneration(int grpId) {
+ assert getWriteHoldCount() > 0;
+
+ partGenerationMap.keySet().removeIf(grpPart -> grpPart.getGroupId() == grpId);
+ }
+
+ /**
+ * Returns IO registry.
+ */
+ PageIoRegistry ioRegistry() {
+ return ioRegistry;
+ }
+ }
+
+ private static int updateAtomicInt(long ptr, int delta) {
+ while (true) {
+ int old = getInt(ptr);
+
+ int updated = old + delta;
+
+ if (compareAndSwapInt(null, ptr, old, updated)) {
+ return updated;
+ }
+ }
+ }
+
+ private static long updateAtomicLong(long ptr, long delta) {
+ while (true) {
+ long old = GridUnsafe.getLong(ptr);
+
+ long updated = old + delta;
+
+ if (compareAndSwapLong(null, ptr, old, updated)) {
+ return updated;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PageIoRegistry ioRegistry() {
+ return ioRegistry;
+ }
+
+ /**
+ * Callback invoked to track changes in pages.
+ */
+ @FunctionalInterface
+ public interface PageChangeTracker {
+ /**
+ * Callback body.
+ *
+ * @param page – Page pointer.
+ * @param fullPageId Full page ID.
+ * @param pageMemoryEx Page memory.
+ */
+ void apply(long page, FullPageId fullPageId, PageMemoryEx pageMemoryEx);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PagePool.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PagePool.java
new file mode 100644
index 0000000..73bf6a1
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PagePool.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.initNew;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.PAGE_LOCK_OFFSET;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.RELATIVE_PTR_MASK;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.getLongVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putLongVolatile;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+
+/**
+ * Page pool.
+ */
+public class PagePool {
+ /** Relative pointer chunk index mask. */
+ static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
+
+ /** Address mask to avoid ABA problem. */
+ private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL;
+
+ /** Counter increment to avoid ABA problem. */
+ private static final long COUNTER_INC = ADDRESS_MASK + 1;
+
+ /** Counter mask to avoid ABA problem. */
+ private static final long COUNTER_MASK = ~ADDRESS_MASK;
+
+ /** Segment index. */
+ protected final int idx;
+
+ /** Direct memory region. */
+ protected final DirectMemoryRegion region;
+
+ /** Pool pages counter. */
+ protected final AtomicInteger pagesCntr = new AtomicInteger();
+
+ /** Index of the last allocated page pointer. */
+ protected long lastAllocatedIdxPtr;
+
+ /** Pointer to the address of the free page list. */
+ protected long freePageListPtr;
+
+ /** Pages base. */
+ protected long pagesBase;
+
+ /** System page size. */
+ private final int sysPageSize;
+
+ /** Instance of RW Lock Updater. */
+ private OffheapReadWriteLock rwLock;
+
+ /**
+ * Constructor.
+ *
+ * @param idx Index.
+ * @param region Region
+ * @param sysPageSize System page size.
+ * @param rwLock Instance of RW Lock Updater.
+ */
+ protected PagePool(
+ int idx,
+ DirectMemoryRegion region,
+ int sysPageSize,
+ OffheapReadWriteLock rwLock
+ ) {
+ this.idx = idx;
+ this.region = region;
+ this.sysPageSize = sysPageSize;
+ this.rwLock = rwLock;
+
+ long base = (region.address() + 7) & ~0x7;
+
+ freePageListPtr = base;
+
+ base += 8;
+
+ lastAllocatedIdxPtr = base;
+
+ base += 8;
+
+ // Align page start by
+ pagesBase = base;
+
+ putLong(freePageListPtr, INVALID_REL_PTR);
+ putLong(lastAllocatedIdxPtr, 0L);
+ }
+
+ /**
+ * Allocates a new free page.
+ *
+ * @param tag Tag to initialize page RW lock.
+ * @return Relative pointer to the allocated page.
+ */
+ public long borrowOrAllocateFreePage(int tag) {
+ long relPtr = borrowFreePage();
+
+ if (relPtr == INVALID_REL_PTR) {
+ relPtr = allocateFreePage(tag);
+ }
+
+ if (relPtr != INVALID_REL_PTR && pagesCntr != null) {
+ pagesCntr.incrementAndGet();
+ }
+
+ return relPtr;
+ }
+
+ /**
+ * Returns relative pointer to a free page that was borrowed from the allocated pool.
+ */
+ private long borrowFreePage() {
+ while (true) {
+ long freePageRelPtrMasked = getLongVolatile(null, freePageListPtr);
+
+ long freePageRelPtr = freePageRelPtrMasked & ADDRESS_MASK;
+
+ if (freePageRelPtr != INVALID_REL_PTR) {
+ long freePageAbsPtr = absolute(freePageRelPtr);
+
+ long nextFreePageRelPtr = getLongVolatile(null, freePageAbsPtr) & ADDRESS_MASK;
+
+ // nextFreePageRelPtr may be invalid because a concurrent thread may have already polled this value
+ // and used it.
+ long cnt = ((freePageRelPtrMasked & COUNTER_MASK) + COUNTER_INC) & COUNTER_MASK;
+
+ if (compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, nextFreePageRelPtr | cnt)) {
+ putLongVolatile(null, freePageAbsPtr, PageHeader.PAGE_MARKER);
+
+ return freePageRelPtr;
+ }
+ } else {
+ return INVALID_REL_PTR;
+ }
+ }
+ }
+
+ /**
+ * Allocates a new free page.
+ *
+ * @param tag Tag to initialize page RW lock.
+ * @return Relative pointer of the allocated page.
+ */
+ private long allocateFreePage(int tag) {
+ long limit = region.address() + region.size();
+
+ while (true) {
+ long lastIdx = getLongVolatile(null, lastAllocatedIdxPtr);
+
+ // Check if we have enough space to allocate a page.
+ if (pagesBase + (lastIdx + 1) * sysPageSize > limit) {
+ return INVALID_REL_PTR;
+ }
+
+ if (compareAndSwapLong(null, lastAllocatedIdxPtr, lastIdx, lastIdx + 1)) {
+ long absPtr = pagesBase + lastIdx * sysPageSize;
+
+ assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+ long relative = relative(lastIdx);
+
+ assert relative != INVALID_REL_PTR;
+
+ initNew(absPtr, relative);
+
+ rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag);
+
+ return relative;
+ }
+ }
+ }
+
+ /**
+ * Returns resulting number of pages in pool if pages counter is enabled, 0 otherwise.
+ *
+ * @param relPtr Relative pointer to free.
+ */
+ public int releaseFreePage(long relPtr) {
+ long absPtr = absolute(relPtr);
+
+ assert !isAcquired(absPtr) : "Release pinned page: " + fullPageId(absPtr);
+
+ int resCntr = 0;
+
+ if (pagesCntr != null) {
+ resCntr = pagesCntr.decrementAndGet();
+ }
+
+ while (true) {
+ long freePageRelPtrMasked = getLongVolatile(null, freePageListPtr);
+
+ long freePageRelPtr = freePageRelPtrMasked & RELATIVE_PTR_MASK;
+
+ putLongVolatile(null, absPtr, freePageRelPtr);
+
+ long cnt = freePageRelPtrMasked & COUNTER_MASK;
+
+ long relPtrWithCnt = (relPtr & ADDRESS_MASK) | cnt;
+
+ if (compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, relPtrWithCnt)) {
+ return resCntr;
+ }
+ }
+ }
+
+ /**
+ * Returns absolute pointer.
+ *
+ * @param relativePtr Relative pointer.
+ */
+ long absolute(long relativePtr) {
+ int segIdx = (int) ((relativePtr >> 40) & 0xFFFF);
+
+ assert segIdx == idx : "expected=" + idx + ", actual=" + segIdx + ", relativePtr=" + hexLong(relativePtr);
+
+ long pageIdx = relativePtr & ~SEGMENT_INDEX_MASK;
+
+ long off = pageIdx * sysPageSize;
+
+ return pagesBase + off;
+ }
+
+ /**
+ * Returns relative pointer.
+ *
+ * @param pageIdx Page index in the pool.
+ */
+ long relative(long pageIdx) {
+ return pageIdx | ((long) idx) << 40;
+ }
+
+ /**
+ * Returns page index in the pool.
+ *
+ * @param relPtr Relative pointer.
+ */
+ long pageIndex(long relPtr) {
+ return relPtr & ~SEGMENT_INDEX_MASK;
+ }
+
+ /**
+ * Returns max number of pages in the pool.
+ */
+ public int pages() {
+ return (int) ((region.size() - (pagesBase - region.address())) / sysPageSize);
+ }
+
+ /**
+ * Returns number of pages in the list.
+ */
+ public int size() {
+ return pagesCntr.get();
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReadWriteManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReadWriteManager.java
new file mode 100644
index 0000000..fe6d176
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReadWriteManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Page store manager.
+ */
+public interface PageReadWriteManager {
+ /**
+ * Reads a page for the given group ID.
+ *
+ * @param grpId Group ID, may be {@code 0} if the page is a meta page.
+ * @param pageId PageID to read.
+ * @param pageBuf Page buffer to write to.
+ * @param keepCrc Keep CRC flag.
+ * @throws IgniteInternalCheckedException If failed to read the page.
+ */
+ void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException;
+
+ /**
+ * Writes the page for the given group ID.
+ *
+ * @param grpId Group ID, may be {@code 0} if the page is a meta page.
+ * @param pageId Page ID.
+ * @param pageBuf Page buffer to write.
+ * @throws IgniteInternalCheckedException If failed to write page.
+ */
+ void write(int grpId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteInternalCheckedException;
+
+ /**
+ * Allocates a page for the given page space.
+ *
+ * @param grpId Group ID.
+ * @param partId Partition ID. Used only if {@code flags} is equal to {@link PageIdAllocator#FLAG_DATA}.
+ * @param flags Page allocation flags.
+ * @return Allocated page ID.
+ * @throws IgniteInternalCheckedException If IO exception occurred while allocating a page ID.
+ */
+ long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException;
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java
new file mode 100644
index 0000000..6cca28c
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Abstract page replacement policy.
+ */
+public abstract class PageReplacementPolicy {
+ /** Page memory segment. */
+ protected final Segment seg;
+
+ /**
+ * Constructor.
+ *
+ * @param seg Page memory segment.
+ */
+ protected PageReplacementPolicy(Segment seg) {
+ this.seg = seg;
+ }
+
+ /**
+ * Existing page touched.
+ *
+ * <p>Note: This method can be invoked under segment write lock or segment read lock.
+ *
+ * @param relPtr Relative pointer to page.
+ */
+ public void onHit(long relPtr) {
+ // No-op.
+ }
+
+ /**
+ * New page added.
+ *
+ * <p>Note: This method always invoked under segment write lock.
+ *
+ * @param relPtr Relative pointer to page.
+ */
+ public void onMiss(long relPtr) {
+ // No-op.
+ }
+
+ /**
+ * Page removed from the page memory.
+ *
+ * @param relPtr Relative pointer to page.
+ */
+ public void onRemove(long relPtr) {
+ // No-op.
+ }
+
+ /**
+ * Finds page to replace.
+ *
+ * <p>Note: This method always invoked under segment write lock.
+ *
+ * @return Relative pointer to page.
+ */
+ public abstract long replace() throws IgniteInternalCheckedException;
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
new file mode 100644
index 0000000..625c357
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+
+/**
+ * Page replacement policy factory.
+ */
+public interface PageReplacementPolicyFactory {
+ /**
+ * Calculates amount of memory required to service {@code pagesCnt} pages.
+ *
+ * @param pagesCnt Pages count.
+ */
+ long requiredMemory(int pagesCnt);
+
+ /**
+ * Create page replacement policy.
+ *
+ * @param seg Page memory segment.
+ * @param ptr Pointer to memory region.
+ * @param pagesCnt Pages count.
+ */
+ PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt);
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java
new file mode 100644
index 0000000..8ecae86
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.META_PAGE_ID;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.PAGE_OVERHEAD;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.freelist.io.PagesListMetaIo;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Random-LRU page replacement policy implementation.
+ */
+public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
+ /** Number of random pages that will be picked for eviction. */
+ public static final int RANDOM_PAGES_EVICT_NUM = 5;
+
+ private static final double FULL_SCAN_THRESHOLD = 0.4;
+
+ /**
+ * Constructor.
+ *
+ * @param seg Page memory segment.
+ */
+ protected RandomLruPageReplacementPolicy(Segment seg) {
+ super(seg);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long replace() throws IgniteInternalCheckedException {
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ LoadedPagesMap loadedPages = seg.loadedPages();
+ PagePool pool = seg.pool();
+
+ final int cap = loadedPages.capacity();
+
+ // With big number of random picked pages we may fall into infinite loop, because
+ // every time the same page may be found.
+ Set<Long> ignored = null;
+
+ long relRmvAddr = INVALID_REL_PTR;
+
+ int iterations = 0;
+
+ while (true) {
+ long cleanAddr = INVALID_REL_PTR;
+ long cleanTs = Long.MAX_VALUE;
+ long dirtyAddr = INVALID_REL_PTR;
+ long dirtyTs = Long.MAX_VALUE;
+ long metaAddr = INVALID_REL_PTR;
+ long metaTs = Long.MAX_VALUE;
+
+ for (int i = 0; i < RANDOM_PAGES_EVICT_NUM; i++) {
+ ++iterations;
+
+ if (iterations > pool.pages() * FULL_SCAN_THRESHOLD) {
+ break;
+ }
+
+ // We need to lookup for pages only in current segment for thread safety,
+ // so peeking random memory will lead to checking for found page segment.
+ // It's much faster to check available pages for segment right away.
+ ReplaceCandidate nearest = loadedPages.getNearestAt(rnd.nextInt(cap));
+
+ assert nearest != null && nearest.relativePointer() != INVALID_REL_PTR;
+
+ long rndAddr = nearest.relativePointer();
+
+ int partGen = nearest.generation();
+
+ final long absPageAddr = seg.absolute(rndAddr);
+
+ FullPageId fullId = PageHeader.fullPageId(absPageAddr);
+
+ // Check page mapping consistency.
+ assert fullId.equals(nearest.fullId()) : "Invalid page mapping [tableId=" + nearest.fullId()
+ + ", actual=" + fullId + ", nearest=" + nearest;
+
+ boolean outdated = partGen < seg.partGeneration(fullId.groupId(), partitionId(fullId.pageId()));
+
+ if (outdated) {
+ return seg.refreshOutdatedPage(fullId.groupId(), fullId.pageId(), true);
+ }
+
+ boolean pinned = PageHeader.isAcquired(absPageAddr);
+
+ boolean skip = ignored != null && ignored.contains(rndAddr);
+
+ final boolean dirty = PageHeader.dirty(absPageAddr);
+
+ if (relRmvAddr == rndAddr || pinned || skip || fullId.pageId() == META_PAGE_ID || dirty) {
+ i--;
+
+ continue;
+ }
+
+ final long pageTs = PageHeader.readTimestamp(absPageAddr);
+
+ final boolean storMeta = isStoreMetadataPage(absPageAddr);
+
+ if (pageTs < cleanTs && !dirty && !storMeta) {
+ cleanAddr = rndAddr;
+
+ cleanTs = pageTs;
+ } else if (pageTs < dirtyTs && dirty && !storMeta) {
+ dirtyAddr = rndAddr;
+
+ dirtyTs = pageTs;
+ } else if (pageTs < metaTs && storMeta) {
+ metaAddr = rndAddr;
+
+ metaTs = pageTs;
+ }
+
+ if (cleanAddr != INVALID_REL_PTR) {
+ relRmvAddr = cleanAddr;
+ } else if (dirtyAddr != INVALID_REL_PTR) {
+ relRmvAddr = dirtyAddr;
+ } else {
+ relRmvAddr = metaAddr;
+ }
+ }
+
+ if (relRmvAddr == INVALID_REL_PTR) {
+ return tryToFindSequentially(cap);
+ }
+
+ final long absRmvAddr = seg.absolute(relRmvAddr);
+
+ final FullPageId fullPageId = PageHeader.fullPageId(absRmvAddr);
+
+ if (!seg.tryToRemovePage(fullPageId, absRmvAddr)) {
+ if (iterations > 10) {
+ if (ignored == null) {
+ ignored = new HashSet<>();
+ }
+
+ ignored.add(relRmvAddr);
+ }
+
+ if (iterations > seg.pool().pages() * FULL_SCAN_THRESHOLD) {
+ return tryToFindSequentially(cap);
+ }
+
+ continue;
+ }
+
+ return relRmvAddr;
+ }
+ }
+
+ /**
+ * Return {@code true} if page is related to metadata.
+ *
+ * @param absPageAddr Absolute page address
+ */
+ private boolean isStoreMetadataPage(long absPageAddr) {
+ try {
+ long dataAddr = absPageAddr + PAGE_OVERHEAD;
+
+ PageIo io = seg.ioRegistry().resolve(dataAddr);
+
+ return io instanceof PagesListMetaIo;
+ } catch (IgniteInternalCheckedException ignored) {
+ return false;
+ }
+ }
+
+ /**
+ * Will scan all segment pages to find one to evict it.
+ *
+ * @param cap Capacity.
+ */
+ private long tryToFindSequentially(int cap) throws IgniteInternalCheckedException {
+ assert seg.getWriteHoldCount() > 0;
+
+ long prevAddr = INVALID_REL_PTR;
+
+ LoadedPagesMap loadedPages = seg.loadedPages();
+
+ for (int i = 0; i < cap; i++) {
+ final ReplaceCandidate nearest = loadedPages.getNearestAt(i);
+
+ assert nearest != null && nearest.relativePointer() != INVALID_REL_PTR;
+
+ final long addr = nearest.relativePointer();
+
+ int partGen = nearest.generation();
+
+ final long absPageAddr = seg.absolute(addr);
+
+ FullPageId fullId = PageHeader.fullPageId(absPageAddr);
+
+ if (partGen < seg.partGeneration(fullId.groupId(), partitionId(fullId.pageId()))) {
+ return seg.refreshOutdatedPage(fullId.groupId(), fullId.pageId(), true);
+ }
+
+ boolean pinned = PageHeader.isAcquired(absPageAddr);
+
+ if (addr == prevAddr || pinned) {
+ continue;
+ }
+
+ final long absEvictAddr = seg.absolute(addr);
+
+ final FullPageId fullPageId = PageHeader.fullPageId(absEvictAddr);
+
+ if (seg.tryToRemovePage(fullPageId, absEvictAddr)) {
+ return addr;
+ }
+
+ prevAddr = addr;
+ }
+
+ throw seg.oomException("no pages to replace");
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
new file mode 100644
index 0000000..651cc14
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+
+/**
+ * {@link RandomLruPageReplacementPolicy} factory.
+ */
+public class RandomLruPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
+ /** {@inheritDoc} */
+ @Override
+ public long requiredMemory(int pagesCnt) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
+ return new RandomLruPageReplacementPolicy(seg);
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ReplaceCandidate.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ReplaceCandidate.java
new file mode 100644
index 0000000..6127103
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ReplaceCandidate.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Replacement removal candidate. Class represents some page from loaded pages table. Usually candidate is found during random {@link
+ * LoadedPagesMap} touch.
+ */
+public class ReplaceCandidate {
+ /** Partition generation saved in map, too old value means page may be safely cleared. */
+ private int gen;
+
+ @IgniteToStringInclude
+ private long relPtr;
+
+ @IgniteToStringInclude
+ private FullPageId fullId;
+
+ /**
+ * Constructor.
+ *
+ * @param gen Partition generation.
+ * @param relPtr Relative pointer to page.
+ * @param fullId Full page ID.
+ */
+ public ReplaceCandidate(int gen, long relPtr, FullPageId fullId) {
+ this.gen = gen;
+ this.relPtr = relPtr;
+ this.fullId = fullId;
+ }
+
+ /**
+ * Returns partition generation saved in map, too old value means page may be safely cleared.
+ */
+ public int generation() {
+ return gen;
+ }
+
+ /**
+ * Returns relative pointer to page.
+ */
+ public long relativePointer() {
+ return relPtr;
+ }
+
+ /**
+ * Returns index.
+ */
+ public FullPageId fullId() {
+ return fullId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(ReplaceCandidate.class, this, "relPtr", hexLong(relPtr));
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RobinHoodBackwardShiftHashMap.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RobinHoodBackwardShiftHashMap.java
new file mode 100644
index 0000000..3c2b1b9
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RobinHoodBackwardShiftHashMap.java
@@ -0,0 +1,700 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.lang.IgniteSystemProperties.getFloat;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.util.function.BiConsumer;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Loaded pages mapping to relative pointer based on Robin Hood hashing: backward shift deletion algorithm.
+ *
+ * <p>Performance of initial Robin Hood hashing could be greatly improved with only a little change to the removal method.
+ *
+ * <p>Instead of replacing entries with 'Removed' fake entries on deletion, backward shift deletion variant for the Robin Hood hashing
+ * algorithm does shift backward all the entries following the entry to delete until either an empty bucket, or a bucket with a DIB of 0
+ * (distance to initial bucket).
+ *
+ * <p>Every deletion will shift backwards entries and therefore decrease their respective DIBs by 1 (all their initial DIB values would be
+ * >=1).
+ *
+ * <p>This implementation stores ideal bucket with entry value itself.
+ */
+public class RobinHoodBackwardShiftHashMap implements LoadedPagesMap {
+ /** Long-long offheap map load factor. */
+ // TODO: IGNITE-16350 Move to config or something else.
+ public static final String IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR = "IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR";
+
+ /** Load factor. */
+ private static final float LOAD_FACTOR = getFloat(IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR, 2.5f);
+
+ /** Size of count of entries (value returned by size() method). */
+ private static final int MAPSIZE_SIZE = 4;
+
+ /** Padding to provide read/write from word beginning for each cell. Change this to 0 if padding is not required. */
+ private static final int CELL_PADDING = 4;
+
+ /** Padding to provide read/write from word beginning. Change this to 0 if padding is not required. */
+ private static final int MAPSIZE_PADDING = 4;
+
+ /** Count of entries offset starting from base address. */
+ private static final int MAPSIZE_OFFSET = 0;
+
+ /** Size of initial/ideal bucket (cell to store value to avoid probing other cells followed). */
+ private static final int IDEAL_BUCKET_SIZE = 4;
+
+ /** Offset of initial/ideal bucket starting from entry base. */
+ private static final int IDEAL_BUCKET_OFFSET = 0;
+
+ /** Group ID size. */
+ private static final int GRP_ID_SIZE = 4;
+
+ /** Group ID offset from entry base. */
+ private static final int GRP_ID_OFFSET = IDEAL_BUCKET_OFFSET + IDEAL_BUCKET_SIZE;
+
+ /** Page ID size. */
+ private static final int PAGE_ID_SIZE = 8;
+
+ /** Page ID offset from entry base. */
+ private static final int PAGE_ID_OFFSET = GRP_ID_OFFSET + GRP_ID_SIZE;
+
+ /** Value size. */
+ private static final int VALUE_SIZE = 8;
+
+ /** Value offset from entry base. */
+ private static final int VALUE_OFFSET = PAGE_ID_OFFSET + PAGE_ID_SIZE;
+
+ /** Version (tag/generation) offset from entry base. */
+ private static final int VERSION_SIZE = 4;
+
+ /** Version (tag/generation) offset from entry base. */
+ private static final int VERSION_OFFSET = VALUE_OFFSET + VALUE_SIZE;
+
+ /** Page ID used for empty bucket. */
+ private static final long EMPTY_PAGE_ID = 0;
+
+ /** Group ID used for empty bucket. */
+ private static final int EMPTY_GRP_ID = 0;
+
+ /** Bytes required for storing one entry (cell). */
+ private static final int BYTES_PER_CELL = IDEAL_BUCKET_SIZE
+ + GRP_ID_SIZE + PAGE_ID_SIZE
+ + VALUE_SIZE + VERSION_SIZE
+ + CELL_PADDING;
+
+ /** Number of buckets, indicates range of scan memory, max probe count and maximum map size. */
+ private final int numBuckets;
+
+ /** Base address of map content. */
+ private long baseAddr;
+
+ /**
+ * Returns estimated memory size required for this map to store the given number of elements.
+ *
+ * @param elementsCnt Maximum elements can be stored in map, its maximum size.
+ */
+ public static long requiredMemory(long elementsCnt) {
+ float loadFactor = LOAD_FACTOR;
+
+ assert loadFactor != 0;
+
+ return requiredMemoryByBuckets((long) (elementsCnt * loadFactor));
+ }
+
+ /**
+ * Returns required size to allocate, based on number of buckets (cells) to store in map, its capacity.
+ *
+ * @param numBuckets Number of buckets (cells) to store, capacity.
+ */
+ static long requiredMemoryByBuckets(long numBuckets) {
+ return numBuckets * BYTES_PER_CELL + MAPSIZE_SIZE + MAPSIZE_PADDING;
+ }
+
+ /**
+ * Creates map in preallocated unsafe memory segment.
+ *
+ * @param baseAddr Base buffer address.
+ * @param size Size available for map, number of buckets (cells) to store will be determined accordingly.
+ */
+ public RobinHoodBackwardShiftHashMap(long baseAddr, long size) {
+ this.numBuckets = (int) ((size - MAPSIZE_SIZE - MAPSIZE_PADDING) / BYTES_PER_CELL);
+ this.baseAddr = baseAddr;
+
+ GridUnsafe.setMemory(baseAddr, size, (byte) 0);
+ }
+
+ /**
+ * Returns base cell (bucket) address in buffer.
+ *
+ * @param idx cell index.
+ */
+ private long entryBase(int idx) {
+ assert idx >= 0 && idx < numBuckets : "idx=" + idx + ", numBuckets=" + numBuckets;
+
+ return baseAddr + MAPSIZE_SIZE + MAPSIZE_PADDING + (long) idx * BYTES_PER_CELL;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long get(int grpId, long pageId, int reqVer, long absent, long outdated) {
+ assert grpId != EMPTY_GRP_ID;
+
+ // initial index is also ideal for searhed element
+ int idxInit = safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets;
+
+ for (int i = 0; i < numBuckets; i++) {
+ int idxCurr = (idxInit + i) % numBuckets;
+
+ final long base = entryBase(idxCurr);
+ final int distanceFromInit = distance(idxCurr, idxInit);
+
+ final int curGrpId = getGrpId(base);
+ final long curPageId = getPageId(base);
+
+ final int dibCurEntry = distance(idxCurr, getIdealBucket(base));
+
+ if (isEmpty(curGrpId, curPageId)) {
+ return absent;
+ } else if (curGrpId == grpId && curPageId == pageId) {
+ //equal value found
+ long actualVer = getVersion(base);
+ boolean freshVal = actualVer >= reqVer;
+
+ return freshVal ? getValue(base) : outdated;
+ } else if (dibCurEntry < distanceFromInit) {
+ //current entry has quite good position, it would be swapped at hypothetical insert of current value
+ return absent;
+ }
+ }
+
+ return absent;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void put(int grpId, long pageId, long val, int ver) {
+ assert grpId != 0;
+
+ // initial index is also ideal for inserted element
+ int idxInit = safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets;
+
+ int swapCnt = 0;
+
+ int grpIdToInsert = grpId;
+ long pageIdToInsert = pageId;
+ long valToInsert = val;
+ int verToInsert = ver;
+ int idxIdealToInsert = idxInit;
+
+ for (int i = 0; i < numBuckets; i++) {
+ int idxCurr = (idxInit + i) % numBuckets;
+
+ final long base = entryBase(idxCurr);
+ final int dibEntryToInsert = distance(idxCurr, idxInit);
+
+ final int curGrpId = getGrpId(base);
+ final long curPageId = getPageId(base);
+ final int curIdealBucket = getIdealBucket(base);
+ final long curVal = getValue(base);
+ final int curVer = getVersion(base);
+ final int dibCurEntry = distance(idxCurr, curIdealBucket);
+
+ if (isEmpty(curGrpId, curPageId)) {
+ setCellValue(base, idxIdealToInsert, grpIdToInsert, pageIdToInsert, valToInsert, verToInsert);
+
+ setSize(size() + 1);
+
+ return;
+ } else if (curGrpId == grpIdToInsert && curPageId == pageIdToInsert) {
+ if (swapCnt != 0) {
+ throw new IllegalStateException("Swapped " + swapCnt + " times. Entry: " + dumpEntry(idxCurr));
+ }
+
+ setValue(base, valToInsert);
+
+ return; //equal value found
+ } else if (dibCurEntry < dibEntryToInsert) {
+ //swapping *toInsert and state in bucket: save cur state to bucket
+ setCellValue(base, idxIdealToInsert, grpIdToInsert, pageIdToInsert, valToInsert, verToInsert);
+
+ idxIdealToInsert = curIdealBucket;
+ pageIdToInsert = curPageId;
+ grpIdToInsert = curGrpId;
+ valToInsert = curVal;
+ verToInsert = curVer;
+
+ swapCnt++;
+ }
+ }
+
+ // no free space left
+ throw new IgniteOutOfMemoryException("No room for a new key");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean remove(int grpId, long pageId) {
+ assert grpId != EMPTY_GRP_ID;
+
+ int idxInit = safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets;
+
+ int idxEqualValFound = -1;
+ for (int i = 0; i < numBuckets; i++) {
+ int idxCurr = (idxInit + i) % numBuckets;
+
+ final long base = entryBase(idxCurr);
+ final int dibEntryToInsert = distance(idxCurr, idxInit);
+
+ final int curGrpId = getGrpId(base);
+ final long curPageId = getPageId(base);
+ final int curIdealBucket = getIdealBucket(base);
+ final int dibCurEntry = distance(idxCurr, curIdealBucket);
+
+ if (isEmpty(curGrpId, curPageId)) {
+ return false;
+ } else if (curGrpId == grpId && curPageId == pageId) {
+ idxEqualValFound = idxCurr;
+
+ break; //equal value found
+ } else if (dibCurEntry < dibEntryToInsert) {
+ //If our value was present in map we had already found it.
+ return false;
+ }
+ }
+
+ setSize(size() - 1);
+
+ doBackwardShift(idxEqualValFound);
+
+ return true;
+ }
+
+ /**
+ * Runs backward shifts from current index to .
+ *
+ * @param idxRmv removed index.
+ */
+ private void doBackwardShift(int idxRmv) {
+ assert idxRmv >= 0;
+
+ //scanning rest of map to perform backward shifts
+ for (int i = 0; i < numBuckets - 1; i++) {
+ int idxCurr = (idxRmv + i) % numBuckets;
+ int idxNext = (idxRmv + i + 1) % numBuckets;
+
+ long baseCurr = entryBase(idxCurr);
+
+ long baseNext = entryBase(idxNext);
+ final int nextGrpId = getGrpId(baseNext);
+ final long nextPageId = getPageId(baseNext);
+ final int nextIdealBucket = getIdealBucket(baseNext);
+ final int nextEntryVer = getVersion(baseNext);
+
+ if (isEmpty(nextGrpId, nextPageId)
+ || distance(idxNext, nextIdealBucket) == 0) {
+ setEmpty(baseCurr);
+
+ return;
+ } else {
+ setCellValue(baseCurr, nextIdealBucket, nextGrpId, nextPageId, getValue(baseNext), nextEntryVer);
+ }
+ }
+
+ int lastShiftedIdx = (idxRmv - 1) % numBuckets;
+
+ if (lastShiftedIdx < 0) {
+ lastShiftedIdx += numBuckets;
+ }
+
+ setEmpty(entryBase(lastShiftedIdx));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReplaceCandidate getNearestAt(final int idxStart) {
+ for (int i = 0; i < numBuckets; i++) {
+ int idxCurr = (idxStart + i) % numBuckets;
+
+ if (isEmptyAt(idxCurr)) {
+ continue;
+ }
+
+ long base = entryBase(idxCurr);
+
+ return new ReplaceCandidate(getVersion(base), getValue(base), getFullPageId(base));
+ }
+
+ return null;
+ }
+
+ /**
+ * Returns {@code true} if value is not provided in cell having index.
+ *
+ * @param idx Index to test.
+ */
+ private boolean isEmptyAt(int idx) {
+ long base = entryBase(idx);
+
+ return isEmpty(getGrpId(base), getPageId(base));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long refresh(int grpId, long pageId, int ver) {
+ assert grpId != EMPTY_GRP_ID;
+
+ int idxInit = safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets;
+
+ for (int i = 0; i < numBuckets; i++) {
+ int idxCurr = (idxInit + i) % numBuckets;
+
+ final long base = entryBase(idxCurr);
+ final int distanceFromInit = distance(idxCurr, idxInit);
+
+ final int curGrpId = getGrpId(base);
+ final long curPageId = getPageId(base);
+ final int curIdealBucket = getIdealBucket(base);
+ final int dibCurEntry = distance(idxCurr, curIdealBucket);
+
+ if (isEmpty(curGrpId, curPageId)) {
+ break; // break to fail
+ } else if (curGrpId == grpId && curPageId == pageId) {
+ //equal value found
+ long actualVer = getVersion(base);
+
+ boolean freshVal = actualVer >= ver;
+
+ if (freshVal) {
+ throw new IllegalArgumentException("Fresh element found at "
+ + dumpEntry(idxCurr) + " during search of cell to refresh. "
+ + "Refresh should be called for existent outdated element. ");
+ }
+
+ setVersion(base, ver);
+
+ return getValue(base);
+ } else if (dibCurEntry < distanceFromInit) {
+ //current entry has quite good position, it would be swapped at hypothetical insert of current value
+
+ break;
+ }
+ }
+
+ throw new IllegalArgumentException("Element not found group ID: " + grpId + ", page ID: " + pageId
+ + " during search of cell to refresh. Refresh should be called for existent outdated element. ");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LongArrayList removeIf(int startIdxToClear, int endIdxToClear, KeyPredicate keyPred) {
+ assert endIdxToClear >= startIdxToClear
+ : "Start and end indexes are not consistent: {" + startIdxToClear + ", " + endIdxToClear + "}";
+
+ int sz = endIdxToClear - startIdxToClear;
+
+ LongArrayList list = new LongArrayList(sz);
+
+ for (int idx = startIdxToClear; idx < endIdxToClear; idx++) {
+ long base = entryBase(idx);
+
+ int grpId = getGrpId(base);
+ long pageId = getPageId(base);
+
+ if (isEmpty(grpId, pageId)) {
+ continue; // absent value, no removal required
+ }
+
+ if (!keyPred.test(grpId, pageId)) {
+ continue; // not matched value, no removal required
+ }
+
+ long valAt = getValue(base);
+
+ setSize(size() - 1);
+
+ doBackwardShift(idx);
+
+ list.add(valAt);
+
+ idx--; //Need recheck current cell because of backward shift
+ }
+
+ return list;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int capacity() {
+ return numBuckets;
+ }
+
+ /**
+ * Returns distance between cells, or 0 if cell is ideal.
+ *
+ * @param curr current selected index to store value.
+ * @param baseIdx base or ideal bucket to store entry value to avoid probing.
+ */
+ private int distance(int curr, int baseIdx) {
+ int diff = curr - baseIdx;
+
+ if (diff < 0) {
+ return diff + numBuckets;
+ }
+
+ return diff;
+ }
+
+ /**
+ * Returns {@code true} if group & page id indicates cell has state 'Empty'.
+ *
+ * @param grpId Group ID.
+ * @param pageId Page ID.
+ */
+ private boolean isEmpty(int grpId, long pageId) {
+ return pageId == EMPTY_PAGE_ID && grpId == EMPTY_GRP_ID;
+ }
+
+ /**
+ * Sets cell value to be empty.
+ *
+ * @param addr entry base address.
+ */
+ private void setEmpty(long addr) {
+ setPageId(addr, EMPTY_PAGE_ID);
+ setGrpId(addr, EMPTY_GRP_ID);
+ setValue(addr, 0);
+ setIdealBucket(addr, 0);
+ setVersion(addr, 0);
+ }
+
+ /**
+ * Sets number of ideal bucket (cell) to store this value.
+ *
+ * @param base Entry base, address in buffer of the entry start.
+ * @param idxIdeal Number of ideal bucket (cell) to insert this value.
+ */
+ private void setIdealBucket(long base, int idxIdeal) {
+ assert idxIdeal >= 0 && idxIdeal < numBuckets;
+
+ putInt(base + IDEAL_BUCKET_OFFSET, idxIdeal);
+ }
+
+ /**
+ * Returns printable dump with all buckets state.
+ */
+ public String dump() {
+ StringBuilder sb = new StringBuilder();
+
+ for (int idx = 0; idx < numBuckets; idx++) {
+ dumpEntry(sb, idx);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Returns string representation of bucket content.
+ *
+ * @param idx index of entry to dump
+ */
+ private String dumpEntry(int idx) {
+ StringBuilder sb = new StringBuilder();
+
+ dumpEntry(sb, idx);
+
+ return sb.toString();
+ }
+
+ /**
+ * Dumps entry.
+ *
+ * @param sb Destination string builder to dump entry to.
+ * @param idx Bucket index.
+ */
+ private void dumpEntry(StringBuilder sb, int idx) {
+ long base = entryBase(idx);
+ int curGrpId = getGrpId(base);
+ long curPageId = getPageId(base);
+ long curVal = getValue(base);
+ long ver = getVersion(base);
+
+ sb.append("slot [").append(idx).append("]:");
+
+ if (isEmpty(curGrpId, curPageId)) {
+ sb.append("Empty: ");
+ }
+
+ sb.append("i.buc=").append(getIdealBucket(base)).append(",");
+ sb.append("(grp=").append(curGrpId).append(",");
+ sb.append("page=").append(curPageId).append(")");
+ sb.append("->");
+ sb.append("(val=").append(curVal).append(",");
+ sb.append("ver=").append(ver).append(")");
+ sb.append("\n");
+ }
+
+ /**
+ * Sets cell value.
+ *
+ * @param base Entry base, address in buffer of the entry start.
+ * @param idealBucket Number of ideal bucket (cell) to insert this value.
+ * @param grpId Entry key. Group ID to be stored in entry.
+ * @param pageId Entry key. Page ID to be stored.
+ * @param val Entry value associated with key.
+ * @param ver Entry version.
+ */
+ private void setCellValue(long base, int idealBucket, int grpId, long pageId, long val, int ver) {
+ setIdealBucket(base, idealBucket);
+ setGrpId(base, grpId);
+ setPageId(base, pageId);
+ setValue(base, val);
+ setVersion(base, ver);
+ }
+
+ /**
+ * Returns number of ideal bucket (cell) to store this value.
+ *
+ * @param base address of current cell.
+ */
+ private int getIdealBucket(long base) {
+ return getInt(base + IDEAL_BUCKET_OFFSET);
+ }
+
+ /**
+ * Returns page ID saved in cell.
+ *
+ * @param base Address of current cell.
+ */
+ private long getPageId(long base) {
+ return getLong(base + PAGE_ID_OFFSET);
+ }
+
+ /**
+ * Sets page ID.
+ *
+ * @param base Address of cell.
+ * @param pageId Page ID to set in current cell.
+ */
+ private void setPageId(long base, long pageId) {
+ putLong(base + PAGE_ID_OFFSET, pageId);
+ }
+
+ /**
+ * Returns group ID stored in entry.
+ *
+ * @param base Entry base address.
+ */
+ private int getGrpId(long base) {
+ return getInt(base + GRP_ID_OFFSET);
+ }
+
+ /**
+ * Sets group ID.
+ *
+ * @param base Entry base address.
+ * @param grpId Group ID to be stored in entry.
+ */
+ private void setGrpId(long base, int grpId) {
+ putInt(base + GRP_ID_OFFSET, grpId);
+ }
+
+ /**
+ * Returns value stored in bucket.
+ *
+ * @param base Bucket base address.
+ */
+ private long getValue(long base) {
+ return getLong(base + VALUE_OFFSET);
+ }
+
+ /**
+ * Sets value to the bucket.
+ *
+ * @param base Bucket base address.
+ * @param val Value to store in bucket.
+ */
+ private void setValue(long base, long val) {
+ putLong(base + VALUE_OFFSET, val);
+ }
+
+ /**
+ * Returns entry version associated with bucket.
+ *
+ * @param base Bucket base address.
+ */
+ private int getVersion(long base) {
+ return getInt(base + VERSION_OFFSET);
+ }
+
+ /**
+ * Sets version to the bucket.
+ *
+ * @param base Bucket base address.
+ * @param ver Entry version to set in bucket.
+ */
+ private void setVersion(long base, int ver) {
+ putInt(base + VERSION_OFFSET, ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public final int size() {
+ return getInt(baseAddr + MAPSIZE_OFFSET);
+ }
+
+ /**
+ * Changes collection size.
+ *
+ * @param sz new size to set.
+ */
+ private void setSize(int sz) {
+ putInt(baseAddr + MAPSIZE_OFFSET, sz);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void forEach(BiConsumer<FullPageId, Long> act) {
+ for (int i = 0; i < numBuckets; i++) {
+ if (isEmptyAt(i)) {
+ continue;
+ }
+
+ long base = entryBase(i);
+
+ act.accept(getFullPageId(base), getValue(base));
+ }
+ }
+
+ /**
+ * Return full page ID from bucket (key).
+ *
+ * @param base bucket base address.
+ */
+ private FullPageId getFullPageId(long base) {
+ return new FullPageId(getPageId(base), getGrpId(base));
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java
new file mode 100644
index 0000000..c37cee4
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Pages Segmented-LRU (SLRU) list implementation.
+ */
+public class SegmentedLruPageList {
+ /** Ratio to limit count of protected pages. */
+ private static final double PROTECTED_TO_TOTAL_PAGES_RATIO = 0.5;
+
+ /** Null page index. */
+ static final int NULL_IDX = -1;
+
+ /** Index of the head page of LRU list. */
+ private int headIdx = NULL_IDX;
+
+ /** Index of the tail page of LRU list. */
+ private int tailIdx = NULL_IDX;
+
+ /** Index of the tail page of probationary segment. */
+ private int probTailIdx = NULL_IDX;
+
+ /** Count of protected pages in the list. */
+ private int protectedPagesCnt;
+
+ /** Protected pages segment limit. */
+ private final int protectedPagesLimit;
+
+ /** Pointer to memory region to store links. */
+ private final long linksPtr;
+
+ /** Pointer to memory region to store protected flags. */
+ private final long flagsPtr;
+
+ /**
+ * Constructor.
+ *
+ * @param totalPagesCnt Total pages count.
+ * @param memPtr Pointer to memory region.
+ */
+ public SegmentedLruPageList(int totalPagesCnt, long memPtr) {
+ linksPtr = memPtr;
+ flagsPtr = memPtr + (((long) totalPagesCnt) << 3);
+
+ GridUnsafe.setMemory(linksPtr, ((long) totalPagesCnt) << 3, (byte) 0xFF);
+ GridUnsafe.setMemory(flagsPtr, (totalPagesCnt + 7) >> 3, (byte) 0);
+
+ protectedPagesLimit = (int) (totalPagesCnt * PROTECTED_TO_TOTAL_PAGES_RATIO);
+ }
+
+ /**
+ * Remove page from the head of LRU list.
+ *
+ * @return Page index or {@code -1} if list is empty.
+ */
+ public synchronized int poll() {
+ int idx = headIdx;
+
+ if (idx != NULL_IDX) {
+ remove(idx);
+ }
+
+ return idx;
+ }
+
+ /**
+ * Remove page from LRU list by page index.
+ *
+ * @param pageIdx Page index.
+ */
+ public synchronized void remove(int pageIdx) {
+ remove0(pageIdx, protectedPage(pageIdx));
+ }
+
+ private void remove0(int pageIdx, boolean clearProtectedFlag) {
+ assert pageIdx != NULL_IDX;
+
+ int prevIdx = prev(pageIdx);
+ int nextIdx = next(pageIdx);
+
+ if (pageIdx == probTailIdx) {
+ probTailIdx = prevIdx;
+ }
+
+ if (prevIdx == NULL_IDX) {
+ assert headIdx == pageIdx : "Unexpected LRU page index [headIdx=" + headIdx + ", pageIdx=" + pageIdx + ']';
+
+ headIdx = nextIdx;
+ } else {
+ next(prevIdx, nextIdx);
+ }
+
+ if (nextIdx == NULL_IDX) {
+ assert tailIdx == pageIdx : "Unexpected LRU page index [tailIdx=" + tailIdx + ", pageIdx=" + pageIdx + ']';
+
+ tailIdx = prevIdx;
+ } else {
+ prev(nextIdx, prevIdx);
+ }
+
+ clearLinks(pageIdx);
+
+ if (clearProtectedFlag) {
+ protectedPagesCnt--;
+
+ protectedPage(pageIdx, false);
+ }
+ }
+
+ /**
+ * Add page to the tail of protected or probationary LRU list.
+ *
+ * @param pageIdx Page index.
+ * @param protectedPage Protected page flag.
+ */
+ public synchronized void addToTail(int pageIdx, boolean protectedPage) {
+ assert prev(pageIdx) == NULL_IDX : prev(pageIdx);
+ assert next(pageIdx) == NULL_IDX : next(pageIdx);
+
+ if (headIdx == NULL_IDX || tailIdx == NULL_IDX) {
+ // In case of empty list.
+ assert headIdx == NULL_IDX : headIdx;
+ assert tailIdx == NULL_IDX : tailIdx;
+ assert probTailIdx == NULL_IDX : probTailIdx;
+ assert protectedPagesCnt == 0 : protectedPagesCnt;
+
+ headIdx = pageIdx;
+ tailIdx = pageIdx;
+
+ if (protectedPage) {
+ protectedPagesCnt = 1;
+
+ protectedPage(pageIdx, true);
+ } else {
+ probTailIdx = pageIdx;
+ }
+
+ return;
+ }
+
+ if (protectedPage) {
+ // Protected page - insert to the list tail.
+ assert next(tailIdx) == NULL_IDX : "Unexpected LRU page index [pageIdx=" + pageIdx
+ + ", tailIdx=" + tailIdx + ", nextLruIdx=" + next(tailIdx) + ']';
+
+ link(tailIdx, pageIdx);
+
+ tailIdx = pageIdx;
+
+ protectedPage(pageIdx, true);
+
+ // Move one page from protected segment to probationary segment if there are too many protected pages.
+ if (protectedPagesCnt >= protectedPagesLimit) {
+ probTailIdx = probTailIdx != NULL_IDX ? next(probTailIdx) : headIdx;
+
+ assert probTailIdx != NULL_IDX;
+
+ protectedPage(probTailIdx, false);
+ } else {
+ protectedPagesCnt++;
+ }
+ } else {
+ if (probTailIdx == NULL_IDX) {
+ // First page in the probationary list - insert to the head.
+ assert prev(headIdx) == NULL_IDX : "Unexpected LRU page index [pageIdx=" + pageIdx
+ + ", headIdx=" + headIdx + ", prevLruIdx=" + prev(headIdx) + ']';
+
+ link(pageIdx, headIdx);
+
+ headIdx = pageIdx;
+ } else {
+ int protectedIdx = next(probTailIdx);
+
+ link(probTailIdx, pageIdx);
+
+ if (protectedIdx == NULL_IDX) {
+ // There are no protected pages in the list.
+ assert probTailIdx == tailIdx :
+ "Unexpected LRU page index [probTailIdx=" + probTailIdx + ", tailIdx=" + tailIdx + ']';
+
+ tailIdx = pageIdx;
+ } else {
+ // Link with last protected page.
+ link(pageIdx, protectedIdx);
+ }
+ }
+
+ probTailIdx = pageIdx;
+ }
+ }
+
+ /**
+ * Move page to the tail of protected LRU list.
+ *
+ * @param pageIdx Page index.
+ */
+ public synchronized void moveToTail(int pageIdx) {
+ if (tailIdx == pageIdx) {
+ return;
+ }
+
+ remove0(pageIdx, false);
+
+ if (protectedPage(pageIdx)) {
+ link(tailIdx, pageIdx);
+
+ tailIdx = pageIdx;
+ } else {
+ addToTail(pageIdx, true);
+ }
+ }
+
+ /**
+ * Link two pages.
+ *
+ * @param prevIdx Previous page index.
+ * @param nextIdx Next page index.
+ */
+ private void link(int prevIdx, int nextIdx) {
+ prev(nextIdx, prevIdx);
+ next(prevIdx, nextIdx);
+ }
+
+ /**
+ * Clear page links.
+ *
+ * @param pageIdx Page index.
+ */
+ private void clearLinks(int pageIdx) {
+ putLong(linksPtr + (((long) pageIdx) << 3), -1L);
+ }
+
+ /**
+ * Gets link to the previous page in the list.
+ *
+ * @param pageIdx Page index.
+ */
+ int prev(int pageIdx) {
+ return getInt(linksPtr + (((long) pageIdx) << 3));
+ }
+
+ /**
+ * Sets link to the previous page in the list.
+ *
+ * @param pageIdx Page index.
+ * @param prevIdx Previous page index.
+ */
+ private void prev(int pageIdx, int prevIdx) {
+ putInt(linksPtr + (((long) pageIdx) << 3), prevIdx);
+ }
+
+ /**
+ * Gets link to the next page in the list.
+ *
+ * @param pageIdx Page index.
+ */
+ int next(int pageIdx) {
+ return getInt(linksPtr + (((long) pageIdx) << 3) + 4);
+ }
+
+ /**
+ * Sets link to the next page in the list.
+ *
+ * @param pageIdx Page index.
+ * @param nextIdx Next page index.
+ */
+ private void next(int pageIdx, int nextIdx) {
+ putInt(linksPtr + (((long) pageIdx) << 3) + 4, nextIdx);
+ }
+
+ /**
+ * Gets protected page flag.
+ *
+ * @param pageIdx Page index.
+ */
+ boolean protectedPage(int pageIdx) {
+ long flags = getLong(flagsPtr + ((pageIdx >> 3) & (~7)));
+
+ return (flags & (1L << pageIdx)) != 0L;
+ }
+
+ /**
+ * Sets protected page flag.
+ *
+ * @param pageIdx Page index.
+ * @param protectedPage Protected page flag.
+ */
+ private void protectedPage(int pageIdx, boolean protectedPage) {
+ long ptr = flagsPtr + ((pageIdx >> 3) & (~7));
+
+ if (protectedPage) {
+ putLong(ptr, getLong(ptr) | (1L << pageIdx));
+ } else {
+ putLong(ptr, getLong(ptr) & ~(1L << pageIdx));
+ }
+ }
+
+ /**
+ * Gets the index of the head page of LRU list.
+ */
+ synchronized int headIdx() {
+ return headIdx;
+ }
+
+ /**
+ * Gets the indexof the tail page of probationary segment.
+ */
+ synchronized int probTailIdx() {
+ return probTailIdx;
+ }
+
+ /**
+ * Gets the index of the tail page of LRU list.
+ */
+ synchronized int tailIdx() {
+ return tailIdx;
+ }
+
+ /**
+ * Gets protected pages count.
+ */
+ synchronized int protectedPagesCount() {
+ return protectedPagesCnt;
+ }
+
+ /**
+ * Gets protected pages limit.
+ */
+ int protectedPagesLimit() {
+ return protectedPagesLimit;
+ }
+
+ /**
+ * Memory required to service {@code pagesCnt} pages.
+ *
+ * @param pagesCnt Pages count.
+ */
+ public static long requiredMemory(int pagesCnt) {
+ return pagesCnt * 8 /* links = 2 ints per page */
+ + ((pagesCnt + 63) / 8) & (~7L) /* protected flags = 1 bit per page + 8 byte align */;
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java
new file mode 100644
index 0000000..247830f
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.OUTDATED_REL_PTR;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Segmented-LRU page replacement policy implementation.
+ */
+public class SegmentedLruPageReplacementPolicy extends PageReplacementPolicy {
+ /** LRU list. */
+ private final SegmentedLruPageList lruList;
+
+ /**
+ * Constructor.
+ *
+ * @param seg Page memory segment.
+ * @param ptr Pointer to memory region.
+ * @param pagesCnt Pages count.
+ */
+ protected SegmentedLruPageReplacementPolicy(Segment seg, long ptr, int pagesCnt) {
+ super(seg);
+
+ lruList = new SegmentedLruPageList(pagesCnt, ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onHit(long relPtr) {
+ int pageIdx = (int) seg.pageIndex(relPtr);
+
+ lruList.moveToTail(pageIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onMiss(long relPtr) {
+ int pageIdx = (int) seg.pageIndex(relPtr);
+
+ lruList.addToTail(pageIdx, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onRemove(long relPtr) {
+ int pageIdx = (int) seg.pageIndex(relPtr);
+
+ lruList.remove(pageIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long replace() throws IgniteInternalCheckedException {
+ LoadedPagesMap loadedPages = seg.loadedPages();
+
+ for (int i = 0; i < loadedPages.size(); i++) {
+ int pageIdx = lruList.poll();
+
+ long relPtr = seg.relative(pageIdx);
+ long absPtr = seg.absolute(relPtr);
+
+ FullPageId fullId = fullPageId(absPtr);
+
+ // Check loaded pages map for outdated page.
+ relPtr = loadedPages.get(
+ fullId.groupId(),
+ fullId.effectivePageId(),
+ seg.partGeneration(fullId.groupId(), partitionId(fullId.pageId())),
+ INVALID_REL_PTR,
+ OUTDATED_REL_PTR
+ );
+
+ assert relPtr != INVALID_REL_PTR;
+
+ if (relPtr == OUTDATED_REL_PTR) {
+ return seg.refreshOutdatedPage(fullId.groupId(), fullId.pageId(), true);
+ }
+
+ if (seg.tryToRemovePage(fullId, absPtr)) {
+ return relPtr;
+ }
+
+ // Return page to the LRU list.
+ lruList.addToTail(pageIdx, true);
+ }
+
+ throw seg.oomException("no pages to replace");
+ }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java
new file mode 100644
index 0000000..4e1234a
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+/**
+ * {@link SegmentedLruPageReplacementPolicy} factory.
+ */
+public class SegmentedLruPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
+ /** {@inheritDoc} */
+ @Override
+ public long requiredMemory(int pagesCnt) {
+ return SegmentedLruPageList.requiredMemory(pagesCnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
+ return new SegmentedLruPageReplacementPolicy(seg, ptr, pagesCnt);
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
index b607eb9..2ba701b 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.pagememory.impl;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -58,7 +59,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
protected static final int PAGE_SIZE = 8 * 1024;
- private static final int MAX_MEMORY_SIZE = 10 * 1024 * 1024;
+ protected static final int MAX_MEMORY_SIZE = 10 * 1024 * 1024;
private static final PageIo PAGE_IO = new TestPageIo();
@@ -68,7 +69,7 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
PageMemoryDataRegionConfigurationSchema.class,
UnsafeMemoryAllocatorConfigurationSchema.class
})
- private DataRegionConfiguration dataRegionCfg;
+ protected DataRegionConfiguration dataRegionCfg;
@Test
public void testPageTearingInner() throws Exception {
@@ -315,7 +316,7 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
.changePageSize(PAGE_SIZE)
.changeInitSize(MAX_MEMORY_SIZE)
.changeMaxSize(MAX_MEMORY_SIZE)
- );
+ ).get(1, SECONDS);
DirectMemoryProvider provider = new UnsafeMemoryProvider(null);
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
new file mode 100644
index 0000000..42b21ee
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.apache.ignite.internal.util.Constants.MiB;
+
+import java.util.stream.LongStream;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.impl.PageMemoryNoLoadSelfTest;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests {@link PageMemoryImpl}.
+ */
+public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
+ /** {@inheritDoc} */
+ @Override
+ protected PageMemory memory() throws Exception {
+ dataRegionCfg.change(cfg ->
+ cfg.convert(PageMemoryDataRegionChange.class)
+ .changePageSize(PAGE_SIZE)
+ .changeInitSize(MAX_MEMORY_SIZE)
+ .changeMaxSize(MAX_MEMORY_SIZE)
+ ).get(1, SECONDS);
+
+ PageIoRegistry ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PageMemoryImpl(
+ new UnsafeMemoryProvider(null),
+ (PageMemoryDataRegionConfiguration) fixConfiguration(dataRegionCfg),
+ ioRegistry,
+ LongStream.range(0, 10).map(i -> 5 * MiB).toArray(),
+ new TestPageReadWriteManager(),
+ (page, fullPageId, pageMemoryEx) -> {
+ }
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Override
+ public void testPageHandleDeallocation() {
+ // No-op.
+ }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/TestPageReadWriteManager.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/TestPageReadWriteManager.java
new file mode 100644
index 0000000..9aa7883
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/TestPageReadWriteManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.pagememory.FullPageId;
+
+/**
+ * Implementation for tests.
+ */
+public class TestPageReadWriteManager implements PageReadWriteManager {
+ private final ConcurrentMap<FullPageId, AtomicInteger> allocators = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override
+ public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long allocatePage(int grpId, int partId, byte flags) {
+ long root = pageId(partId, flags, 0);
+
+ FullPageId fullId = new FullPageId(root, grpId);
+
+ AtomicInteger allocator = allocators.computeIfAbsent(fullId, fullPageId -> new AtomicInteger(1));
+
+ return pageId(partId, flags, allocator.getAndIncrement());
+ }
+}