You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/02/16 15:14:12 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

tkalkirill opened a new pull request #667:
URL: https://github.com/apache/ignite-3/pull/667


   https://issues.apache.org/jira/browse/IGNITE-16560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816817172



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Pair of group ID with partition ID. Immutable, comparable class, may be used as key in maps.
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+    /** Index for super(meta) page. There is always such page for iterated cache partition. */
+    private static final int METAPAGE_IDX = 0;
+
+    /** Group ID. */
+    private final int grpId;
+
+    /** Partition ID. */
+    private final int partId;
+
+    /**
+     * Creates group-partition tuple.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public GroupPartitionId(final int grpId, final int partId) {
+        this.grpId = grpId;
+        this.partId = partId;
+    }
+
+    /**
+     * Returns flag to be used for partition.
+     *
+     * @param partId Partition ID.
+     */
+    public static byte getFlagByPartId(final int partId) {
+        return partId == INDEX_PARTITION ? FLAG_AUX : FLAG_DATA;
+    }
+
+    /**
+     * Return group ID.
+     */
+    public int getGroupId() {
+        return grpId;
+    }
+
+    /**
+     * Return partition ID.
+     */
+    public int getPartitionId() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(GroupPartitionId.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        GroupPartitionId key = (GroupPartitionId) o;
+
+        if (grpId != key.grpId) {
+            return false;
+        }
+
+        return partId == key.partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        int result = grpId;
+
+        result = 31 * result + partId;
+
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int compareTo(GroupPartitionId o) {
+        if (getGroupId() < o.getGroupId()) {
+            return -1;
+        }
+
+        if (getGroupId() > o.getGroupId()) {
+            return 1;
+        }
+
+        if (getPartitionId() < o.getPartitionId()) {
+            return -1;
+        }
+
+        if (getPartitionId() > o.getPartitionId()) {
+            return 1;
+        }
+
+        return 0;
+    }
+
+    /**
+     * Returns page ID (64 bits) constructed from partition ID and given index.
+     *
+     * @param pageIdx Page Index, monotonically growing number within each partition.
+     */
+    private long createPageId(final int pageIdx) {
+        final int partId = getPartitionId();
+
+        return pageId(partId, getFlagByPartId(partId), pageIdx);
+    }
+
+    /**
+     * Returns Full page ID. For index 0 will return super-page of next partition
+     *
+     * @param pageIdx Page Index, monotonically growing number within each partition
+     * @return FullPageId consists of cache ID (32 bits) and page ID (64 bits).
+     */
+    private FullPageId createFullPageId(final int pageIdx) {
+        return new FullPageId(createPageId(pageIdx), getGroupId());

Review comment:
       The code is not needed got rid of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816819955



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+
+import java.util.function.LongUnaryOperator;
+
+/**
+ * Clock page replacement algorithm implementation.
+ */
+public class ClockPageReplacementFlags {
+    /** Total pages count. */
+    private final int pagesCnt;
+
+    /** Index of the next candidate ("hand"). */
+    private int curIdx;
+
+    /** Pointer to memory region to store page hit flags. */
+    private final long flagsPtr;
+
+    /**
+     * Constructor.
+     *
+     * @param totalPagesCnt Total pages count.
+     * @param memPtr Pointer to memory region.
+     */
+    ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
+        pagesCnt = totalPagesCnt;
+        flagsPtr = memPtr;
+
+        setMemory(flagsPtr, (totalPagesCnt + 7) >> 3, (byte) 0);

Review comment:
       Added a todo




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816810838



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -252,6 +252,23 @@ public static String toHexString(byte[] arr, int maxLen) {
         return sb.toString().toUpperCase();
     }
 
+    /**
+     * Returns hex representation of memory region.
+     *
+     * @param addr Pointer in memory.
+     * @param len How much byte to read (should divide 8).

Review comment:
       Fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816842433



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
##########
@@ -0,0 +1,1602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.lang.System.lineSeparator;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getCrc;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.tag;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.IgniteUtils.hash;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+------------------------------------------------------+
+ * |8 bytes |         PAGE_SIZE + PAGE_OVERHEAD - 8 bytes          |
+ * +--------+------------------------------------------------------+
+ * |Next ptr|                      Page data                       |
+ * +--------+------------------------------------------------------+
+ * </pre>
+ *
+ * <p>When page is allocated and is in use:
+ * <pre>
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * |     8 bytes      |8 bytes |8 bytes |4 b |4 b |8 bytes |8 bytes |       PAGE_SIZE      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | Marker/Timestamp |Rel ptr |Page ID |C ID|PIN | LOCK   |TMP BUF |       Page data      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending on whether the page is
+ * in use or not.
+ */
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PageMemoryImpl implements PageMemoryEx {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
+
+    /** Full relative pointer mask. */
+    public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Invalid relative pointer value. */
+    static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+    /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
+    static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+
+    /** Page lock offset. */
+    public static final int PAGE_LOCK_OFFSET = 32;
+
+    /** 8b Marker/timestamp 8b Relative pointer 8b Page ID 4b Group ID 4b Pin count 8b Lock 8b Temporary buffer. */
+    public static final int PAGE_OVERHEAD = 48;
+
+    /** Try again tag. */
+    public static final int TRY_AGAIN_TAG = -1;
+
+    /** Data region configuration view. */
+    private final PageMemoryDataRegionView dataRegionCfg;
+
+    /** Page IO registry. */
+    private final PageIoRegistry ioRegistry;
+
+    /** Page manager. */
+    private final PageReadWriteManager pmPageMgr;
+
+    /** Page size. */
+    private final int sysPageSize;
+
+    /** Page replacement policy factory. */
+    private final PageReplacementPolicyFactory pageReplacementPolicyFactory;
+
+    /** Direct memory allocator. */
+    private final DirectMemoryProvider directMemoryProvider;
+
+    /** Segments array. */
+    private volatile Segment[] segments;
+
+    /** Lock for segments changes. */
+    private final Object segmentsLock = new Object();
+
+    /** Offheap read write lock instance. */
+    private final OffheapReadWriteLock rwLock;
+
+    /** Callback invoked to track changes in pages. {@code Null} if page tracking functionality is disabled. */
+    @Nullable
+    private final PageChangeTracker changeTracker;
+
+    /** Field updater. */
+    private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
+    /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
+    private volatile int pageReplacementWarned;
+
+    /** Segments sizes. */
+    private final long[] sizes;
+
+    /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
+    private volatile boolean started;
+
+    /**
+     * Constructor.
+     *
+     * @param directMemoryProvider Memory allocator to use.
+     * @param dataRegionCfg Data region configuration.
+     * @param ioRegistry IO registry.
+     * @param sizes Segments sizes.
+     * @param pmPageMgr Page store manager.
+     * @param changeTracker Callback invoked to track changes in pages.
+     */
+    public PageMemoryImpl(
+            DirectMemoryProvider directMemoryProvider,
+            PageMemoryDataRegionConfiguration dataRegionCfg,
+            PageIoRegistry ioRegistry,
+            long[] sizes,
+            PageReadWriteManager pmPageMgr,
+            @Nullable PageChangeTracker changeTracker
+    ) {
+        this.directMemoryProvider = directMemoryProvider;
+        this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+        this.ioRegistry = ioRegistry;
+        this.sizes = sizes;
+        this.pmPageMgr = pmPageMgr;
+        this.changeTracker = changeTracker;
+
+        int pageSize = this.dataRegionCfg.pageSize();
+
+        sysPageSize = pageSize + PAGE_OVERHEAD;
+
+        rwLock = new OffheapReadWriteLock(128);
+
+        String replacementMode = this.dataRegionCfg.replacementMode();
+
+        switch (replacementMode) {
+            case RANDOM_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new RandomLruPageReplacementPolicyFactory();
+
+                break;
+            case SEGMENTED_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new SegmentedLruPageReplacementPolicyFactory();
+
+                break;
+            case CLOCK_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new ClockPageReplacementPolicyFactory();
+
+                break;
+            default:
+                throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (started) {
+                return;
+            }
+
+            started = true;
+
+            directMemoryProvider.initialize(sizes);
+
+            List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
+
+            while (true) {
+                DirectMemoryRegion reg = directMemoryProvider.nextRegion();
+
+                if (reg == null) {
+                    break;
+                }
+
+                regions.add(reg);
+            }
+
+            int regs = regions.size();
+
+            Segment[] segments = new Segment[regs - 1];
+
+            DirectMemoryRegion cpReg = regions.get(regs - 1);
+
+            long checkpointBuf = cpReg.size();
+
+            long totalAllocated = 0;
+            int pages = 0;
+            long totalTblSize = 0;
+            long totalReplSize = 0;
+
+            for (int i = 0; i < regs - 1; i++) {
+                assert i < segments.length;
+
+                DirectMemoryRegion reg = regions.get(i);
+
+                totalAllocated += reg.size();
+
+                segments[i] = new Segment(i, regions.get(i));
+
+                pages += segments[i].pages();
+                totalTblSize += segments[i].tableSize();
+                totalReplSize += segments[i].replacementSize();
+            }
+
+            this.segments = segments;
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Started page memory [memoryAllocated=" + readableSize(totalAllocated, false)
+                        + ", pages=" + pages
+                        + ", tableSize=" + readableSize(totalTblSize, false)
+                        + ", replacementSize=" + readableSize(totalReplSize, false)
+                        + ", checkpointBuffer=" + readableSize(checkpointBuf, false)
+                        + ']');
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean deallocate) throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (!started) {
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping page memory.");
+            }
+
+            if (segments != null) {
+                for (Segment seg : segments) {
+                    seg.close();
+                }
+            }
+
+            started = false;
+
+            directMemoryProvider.shutdown(deallocate);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void releasePage(int grpId, long pageId, long page) {
+        assert started;
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            seg.releasePage(page);
+        } finally {
+            seg.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return readLock(page, pageId, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+        assert started;
+
+        int tag = force ? -1 : tag(pageId);
+
+        boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+        if (!locked) {
+            return 0;
+        }
+
+        if (touch) {
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+        }
+
+        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+        return absPtr + PAGE_OVERHEAD;
+    }
+
+    private long readLock(long absPtr, long pageId, boolean force) {
+        return readLock(absPtr, pageId, force, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void readUnlock(int grpId, long pageId, long page) {
+        assert started;
+
+        readUnlockPage(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return writeLock(grpId, pageId, page, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page, boolean restore) {
+        assert started;
+
+        return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long tryWriteLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag) {
+        assert started;
+
+        writeUnlock(grpId, pageId, page, dirtyFlag, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
+        assert started;
+
+        writeUnlockPage(page, new FullPageId(pageId, grpId), dirtyFlag, restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isDirty(int grpId, long pageId, long page) {
+        assert started;
+
+        return isDirty(page);
+    }
+
+    /**
+     * Returns {@code true} if page is dirty.
+     *
+     * @param absPtr Absolute pointer.
+     */
+    boolean isDirty(long absPtr) {
+        return dirty(absPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
+
+        assert started;
+
+        long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
+
+        assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+
+        // We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
+        // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+        // because there is no crc inside them.
+        Segment seg = segment(grpId, pageId);
+
+        seg.writeLock().lock();
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            if (relPtr == OUTDATED_REL_PTR) {
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.removePageForReplacement();
+            }
+
+            long absPtr = seg.absolute(relPtr);
+
+            setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+            fullPageId(absPtr, fullId);
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+            rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+            assert !isAcquired(absPtr) :
+                    "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr)
+                            + ", absPtr=" + hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
+
+            setDirty(fullId, absPtr, true, true);
+
+            seg.pageReplacementPolicy.onMiss(relPtr);
+
+            seg.loadedPages.put(grpId, effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId));
+        } catch (IgniteOutOfMemoryException oom) {
+            IgniteOutOfMemoryException e = new IgniteOutOfMemoryException("Out of memory in data region ["
+                    + "name=" + dataRegionCfg.name()
+                    + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+                    + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+                    + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+                    + "  ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+                    + "  ^-- Enable eviction or expiration policies"
+            );
+
+            e.initCause(oom);
+
+            throw e;
+        } finally {
+            seg.writeLock().unlock();
+        }
+
+        return pageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer pageBuffer(long pageAddr) {
+        return wrapPointer(pageAddr, pageSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean freePage(int grpId, long pageId) {
+        assert false : "Free page should be never called directly when persistence is enabled.";
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long partitionMetaPageId(int grpId, int partId) {
+        assert started;
+
+        return pageId(partId, FLAG_DATA, 0);

Review comment:
       left the todo




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816816954



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Pair of group ID with partition ID. Immutable, comparable class, may be used as key in maps.
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+    /** Index for super(meta) page. There is always such page for iterated cache partition. */
+    private static final int METAPAGE_IDX = 0;
+
+    /** Group ID. */
+    private final int grpId;
+
+    /** Partition ID. */
+    private final int partId;
+
+    /**
+     * Creates group-partition tuple.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public GroupPartitionId(final int grpId, final int partId) {
+        this.grpId = grpId;
+        this.partId = partId;
+    }
+
+    /**
+     * Returns flag to be used for partition.
+     *
+     * @param partId Partition ID.
+     */
+    public static byte getFlagByPartId(final int partId) {
+        return partId == INDEX_PARTITION ? FLAG_AUX : FLAG_DATA;
+    }
+
+    /**
+     * Return group ID.
+     */
+    public int getGroupId() {
+        return grpId;
+    }
+
+    /**
+     * Return partition ID.
+     */
+    public int getPartitionId() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(GroupPartitionId.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        GroupPartitionId key = (GroupPartitionId) o;
+
+        if (grpId != key.grpId) {
+            return false;
+        }
+
+        return partId == key.partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        int result = grpId;
+
+        result = 31 * result + partId;
+
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int compareTo(GroupPartitionId o) {
+        if (getGroupId() < o.getGroupId()) {
+            return -1;
+        }
+
+        if (getGroupId() > o.getGroupId()) {
+            return 1;
+        }
+
+        if (getPartitionId() < o.getPartitionId()) {
+            return -1;
+        }
+
+        if (getPartitionId() > o.getPartitionId()) {
+            return 1;
+        }
+
+        return 0;
+    }
+
+    /**
+     * Returns page ID (64 bits) constructed from partition ID and given index.
+     *
+     * @param pageIdx Page Index, monotonically growing number within each partition.
+     */
+    private long createPageId(final int pageIdx) {
+        final int partId = getPartitionId();
+
+        return pageId(partId, getFlagByPartId(partId), pageIdx);

Review comment:
       The code is not needed got rid of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816815484



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Pair of group ID with partition ID. Immutable, comparable class, may be used as key in maps.
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+    /** Index for super(meta) page. There is always such page for iterated cache partition. */
+    private static final int METAPAGE_IDX = 0;
+
+    /** Group ID. */
+    private final int grpId;
+
+    /** Partition ID. */
+    private final int partId;
+
+    /**
+     * Creates group-partition tuple.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public GroupPartitionId(final int grpId, final int partId) {
+        this.grpId = grpId;
+        this.partId = partId;
+    }
+
+    /**
+     * Returns flag to be used for partition.
+     *
+     * @param partId Partition ID.
+     */
+    public static byte getFlagByPartId(final int partId) {

Review comment:
       Not used, remove the code associated with it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816826484



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+
+import java.util.function.LongUnaryOperator;
+
+/**
+ * Clock page replacement algorithm implementation.
+ */
+public class ClockPageReplacementFlags {
+    /** Total pages count. */
+    private final int pagesCnt;
+
+    /** Index of the next candidate ("hand"). */
+    private int curIdx;
+
+    /** Pointer to memory region to store page hit flags. */
+    private final long flagsPtr;
+
+    /**
+     * Constructor.
+     *
+     * @param totalPagesCnt Total pages count.
+     * @param memPtr Pointer to memory region.
+     */
+    ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
+        pagesCnt = totalPagesCnt;
+        flagsPtr = memPtr;
+
+        setMemory(flagsPtr, (totalPagesCnt + 7) >> 3, (byte) 0);

Review comment:
       Fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816820202



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Page memory with some persistence related additions.
+ */
+//TODO IGNITE-16350 Improve javadoc in this class.
+public interface PageMemoryEx extends PageMemory {

Review comment:
       Added a todo




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816832043



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
##########
@@ -0,0 +1,1602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.lang.System.lineSeparator;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getCrc;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.tag;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.IgniteUtils.hash;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+------------------------------------------------------+
+ * |8 bytes |         PAGE_SIZE + PAGE_OVERHEAD - 8 bytes          |
+ * +--------+------------------------------------------------------+
+ * |Next ptr|                      Page data                       |
+ * +--------+------------------------------------------------------+
+ * </pre>
+ *
+ * <p>When page is allocated and is in use:
+ * <pre>
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * |     8 bytes      |8 bytes |8 bytes |4 b |4 b |8 bytes |8 bytes |       PAGE_SIZE      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | Marker/Timestamp |Rel ptr |Page ID |C ID|PIN | LOCK   |TMP BUF |       Page data      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending on whether the page is
+ * in use or not.
+ */
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PageMemoryImpl implements PageMemoryEx {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
+
+    /** Full relative pointer mask. */
+    public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Invalid relative pointer value. */
+    static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+    /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
+    static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+
+    /** Page lock offset. */
+    public static final int PAGE_LOCK_OFFSET = 32;
+
+    /** 8b Marker/timestamp 8b Relative pointer 8b Page ID 4b Group ID 4b Pin count 8b Lock 8b Temporary buffer. */
+    public static final int PAGE_OVERHEAD = 48;
+
+    /** Try again tag. */
+    public static final int TRY_AGAIN_TAG = -1;
+
+    /** Data region configuration view. */
+    private final PageMemoryDataRegionView dataRegionCfg;
+
+    /** Page IO registry. */
+    private final PageIoRegistry ioRegistry;
+
+    /** Page manager. */
+    private final PageReadWriteManager pmPageMgr;
+
+    /** Page size. */
+    private final int sysPageSize;
+
+    /** Page replacement policy factory. */
+    private final PageReplacementPolicyFactory pageReplacementPolicyFactory;
+
+    /** Direct memory allocator. */
+    private final DirectMemoryProvider directMemoryProvider;
+
+    /** Segments array. */
+    private volatile Segment[] segments;
+
+    /** Lock for segments changes. */
+    private final Object segmentsLock = new Object();
+
+    /** Offheap read write lock instance. */
+    private final OffheapReadWriteLock rwLock;
+
+    /** Callback invoked to track changes in pages. {@code Null} if page tracking functionality is disabled. */
+    @Nullable
+    private final PageChangeTracker changeTracker;
+
+    /** Field updater. */
+    private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
+    /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
+    private volatile int pageReplacementWarned;
+
+    /** Segments sizes. */
+    private final long[] sizes;
+
+    /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
+    private volatile boolean started;
+
+    /**
+     * Constructor.
+     *
+     * @param directMemoryProvider Memory allocator to use.
+     * @param dataRegionCfg Data region configuration.
+     * @param ioRegistry IO registry.
+     * @param sizes Segments sizes.
+     * @param pmPageMgr Page store manager.
+     * @param changeTracker Callback invoked to track changes in pages.
+     */
+    public PageMemoryImpl(
+            DirectMemoryProvider directMemoryProvider,
+            PageMemoryDataRegionConfiguration dataRegionCfg,
+            PageIoRegistry ioRegistry,
+            long[] sizes,
+            PageReadWriteManager pmPageMgr,
+            @Nullable PageChangeTracker changeTracker
+    ) {
+        this.directMemoryProvider = directMemoryProvider;
+        this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+        this.ioRegistry = ioRegistry;
+        this.sizes = sizes;
+        this.pmPageMgr = pmPageMgr;
+        this.changeTracker = changeTracker;
+
+        int pageSize = this.dataRegionCfg.pageSize();
+
+        sysPageSize = pageSize + PAGE_OVERHEAD;
+
+        rwLock = new OffheapReadWriteLock(128);
+
+        String replacementMode = this.dataRegionCfg.replacementMode();
+
+        switch (replacementMode) {
+            case RANDOM_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new RandomLruPageReplacementPolicyFactory();
+
+                break;
+            case SEGMENTED_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new SegmentedLruPageReplacementPolicyFactory();
+
+                break;
+            case CLOCK_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new ClockPageReplacementPolicyFactory();
+
+                break;
+            default:
+                throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (started) {
+                return;
+            }
+
+            started = true;
+
+            directMemoryProvider.initialize(sizes);
+
+            List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
+
+            while (true) {
+                DirectMemoryRegion reg = directMemoryProvider.nextRegion();
+
+                if (reg == null) {
+                    break;
+                }
+
+                regions.add(reg);
+            }
+
+            int regs = regions.size();
+
+            Segment[] segments = new Segment[regs - 1];
+
+            DirectMemoryRegion cpReg = regions.get(regs - 1);
+
+            long checkpointBuf = cpReg.size();
+
+            long totalAllocated = 0;
+            int pages = 0;
+            long totalTblSize = 0;
+            long totalReplSize = 0;
+
+            for (int i = 0; i < regs - 1; i++) {
+                assert i < segments.length;
+
+                DirectMemoryRegion reg = regions.get(i);
+
+                totalAllocated += reg.size();
+
+                segments[i] = new Segment(i, regions.get(i));
+
+                pages += segments[i].pages();
+                totalTblSize += segments[i].tableSize();
+                totalReplSize += segments[i].replacementSize();
+            }
+
+            this.segments = segments;
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Started page memory [memoryAllocated=" + readableSize(totalAllocated, false)
+                        + ", pages=" + pages
+                        + ", tableSize=" + readableSize(totalTblSize, false)
+                        + ", replacementSize=" + readableSize(totalReplSize, false)
+                        + ", checkpointBuffer=" + readableSize(checkpointBuf, false)
+                        + ']');
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean deallocate) throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (!started) {
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping page memory.");
+            }
+
+            if (segments != null) {
+                for (Segment seg : segments) {
+                    seg.close();
+                }
+            }
+
+            started = false;
+
+            directMemoryProvider.shutdown(deallocate);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void releasePage(int grpId, long pageId, long page) {
+        assert started;
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            seg.releasePage(page);
+        } finally {
+            seg.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return readLock(page, pageId, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+        assert started;
+
+        int tag = force ? -1 : tag(pageId);
+
+        boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+        if (!locked) {
+            return 0;
+        }
+
+        if (touch) {
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+        }
+
+        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+        return absPtr + PAGE_OVERHEAD;
+    }
+
+    private long readLock(long absPtr, long pageId, boolean force) {
+        return readLock(absPtr, pageId, force, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void readUnlock(int grpId, long pageId, long page) {
+        assert started;
+
+        readUnlockPage(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return writeLock(grpId, pageId, page, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page, boolean restore) {
+        assert started;
+
+        return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long tryWriteLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag) {
+        assert started;
+
+        writeUnlock(grpId, pageId, page, dirtyFlag, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
+        assert started;
+
+        writeUnlockPage(page, new FullPageId(pageId, grpId), dirtyFlag, restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isDirty(int grpId, long pageId, long page) {
+        assert started;
+
+        return isDirty(page);
+    }
+
+    /**
+     * Returns {@code true} if page is dirty.
+     *
+     * @param absPtr Absolute pointer.
+     */
+    boolean isDirty(long absPtr) {
+        return dirty(absPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
+
+        assert started;
+
+        long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
+
+        assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+
+        // We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
+        // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+        // because there is no crc inside them.
+        Segment seg = segment(grpId, pageId);
+
+        seg.writeLock().lock();
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            if (relPtr == OUTDATED_REL_PTR) {
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.removePageForReplacement();
+            }
+
+            long absPtr = seg.absolute(relPtr);
+
+            setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+            fullPageId(absPtr, fullId);
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+            rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+            assert !isAcquired(absPtr) :
+                    "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr)
+                            + ", absPtr=" + hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
+
+            setDirty(fullId, absPtr, true, true);
+
+            seg.pageReplacementPolicy.onMiss(relPtr);
+
+            seg.loadedPages.put(grpId, effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId));
+        } catch (IgniteOutOfMemoryException oom) {
+            IgniteOutOfMemoryException e = new IgniteOutOfMemoryException("Out of memory in data region ["
+                    + "name=" + dataRegionCfg.name()
+                    + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+                    + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+                    + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+                    + "  ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+                    + "  ^-- Enable eviction or expiration policies"
+            );
+
+            e.initCause(oom);
+
+            throw e;
+        } finally {
+            seg.writeLock().unlock();
+        }
+
+        return pageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer pageBuffer(long pageAddr) {
+        return wrapPointer(pageAddr, pageSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean freePage(int grpId, long pageId) {
+        assert false : "Free page should be never called directly when persistence is enabled.";
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long partitionMetaPageId(int grpId, int partId) {
+        assert started;
+
+        return pageId(partId, FLAG_DATA, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        assert started;
+
+        return acquirePage(grpId, pageId, statHolder, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false, pageAllocated);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder, boolean restore) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, statHolder, restore, null);
+    }
+
+    private long acquirePage(
+            int grpId,
+            long pageId,
+            IoStatisticsHolder statHolder,
+            boolean restore,
+            @Nullable AtomicBoolean pageAllocated
+    ) throws IgniteInternalCheckedException {
+        assert started;
+
+        int partId = partitionId(pageId);
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    INVALID_REL_PTR
+            );
+
+            // The page is loaded to the memory.
+            if (relPtr != INVALID_REL_PTR) {
+                long absPtr = seg.absolute(relPtr);
+
+                seg.acquirePage(absPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+
+                return absPtr;
+            }
+        } finally {
+            seg.readLock().unlock();
+        }
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        seg.writeLock().lock();
+
+        long lockedPageAbsPtr = -1;
+        boolean readPageFromStore = false;
+
+        try {
+            // Double-check.
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    fullId.effectivePageId(),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            long absPtr;
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+
+                if (pageAllocated != null) {
+                    pageAllocated.set(true);
+                }
+
+                if (relPtr == INVALID_REL_PTR) {
+                    relPtr = seg.removePageForReplacement();
+                }
+
+                absPtr = seg.absolute(relPtr);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                // We can clear dirty flag after the page has been allocated.
+                setDirty(fullId, absPtr, false, false);
+
+                seg.pageReplacementPolicy.onMiss(relPtr);
+
+                seg.loadedPages.put(
+                        grpId,
+                        fullId.effectivePageId(),
+                        relPtr,
+                        seg.partGeneration(grpId, partId)
+                );
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                if (!restore) {
+                    readPageFromStore = true;
+                } else {
+                    setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+                    // Must init page ID in order to ensure RWLock tag consistency.
+                    setPageId(pageAddr, pageId);
+                }
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                if (readPageFromStore) {
+                    boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+                    assert locked : "Page ID " + fullId + " expected to be locked";
+
+                    lockedPageAbsPtr = absPtr;
+                }
+            } else if (relPtr == OUTDATED_REL_PTR) {
+                assert pageIndex(pageId) == 0 : fullId;
+
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                absPtr = seg.absolute(relPtr);
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                setMemory(pageAddr, pageSize(), (byte) 0);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+                setPageId(pageAddr, pageId);
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+                seg.pageReplacementPolicy.onMiss(relPtr);
+            } else {
+                absPtr = seg.absolute(relPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+            }
+
+            seg.acquirePage(absPtr);
+
+            if (!readPageFromStore) {
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+            }
+
+            return absPtr;
+        } catch (IgniteOutOfMemoryException oom) {
+            throw oom;

Review comment:
       Forgot to remove when refactoring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816835265



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
##########
@@ -0,0 +1,1602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.lang.System.lineSeparator;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getCrc;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.tag;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.IgniteUtils.hash;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+------------------------------------------------------+
+ * |8 bytes |         PAGE_SIZE + PAGE_OVERHEAD - 8 bytes          |
+ * +--------+------------------------------------------------------+
+ * |Next ptr|                      Page data                       |
+ * +--------+------------------------------------------------------+
+ * </pre>
+ *
+ * <p>When page is allocated and is in use:
+ * <pre>
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * |     8 bytes      |8 bytes |8 bytes |4 b |4 b |8 bytes |8 bytes |       PAGE_SIZE      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | Marker/Timestamp |Rel ptr |Page ID |C ID|PIN | LOCK   |TMP BUF |       Page data      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending on whether the page is
+ * in use or not.
+ */
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PageMemoryImpl implements PageMemoryEx {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
+
+    /** Full relative pointer mask. */
+    public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Invalid relative pointer value. */
+    static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+    /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
+    static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+
+    /** Page lock offset. */
+    public static final int PAGE_LOCK_OFFSET = 32;
+
+    /** 8b Marker/timestamp 8b Relative pointer 8b Page ID 4b Group ID 4b Pin count 8b Lock 8b Temporary buffer. */
+    public static final int PAGE_OVERHEAD = 48;
+
+    /** Try again tag. */
+    public static final int TRY_AGAIN_TAG = -1;
+
+    /** Data region configuration view. */
+    private final PageMemoryDataRegionView dataRegionCfg;
+
+    /** Page IO registry. */
+    private final PageIoRegistry ioRegistry;
+
+    /** Page manager. */
+    private final PageReadWriteManager pmPageMgr;
+
+    /** Page size. */
+    private final int sysPageSize;
+
+    /** Page replacement policy factory. */
+    private final PageReplacementPolicyFactory pageReplacementPolicyFactory;
+
+    /** Direct memory allocator. */
+    private final DirectMemoryProvider directMemoryProvider;
+
+    /** Segments array. */
+    private volatile Segment[] segments;
+
+    /** Lock for segments changes. */
+    private final Object segmentsLock = new Object();
+
+    /** Offheap read write lock instance. */
+    private final OffheapReadWriteLock rwLock;
+
+    /** Callback invoked to track changes in pages. {@code Null} if page tracking functionality is disabled. */
+    @Nullable
+    private final PageChangeTracker changeTracker;
+
+    /** Field updater. */
+    private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
+    /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
+    private volatile int pageReplacementWarned;
+
+    /** Segments sizes. */
+    private final long[] sizes;
+
+    /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
+    private volatile boolean started;
+
+    /**
+     * Constructor.
+     *
+     * @param directMemoryProvider Memory allocator to use.
+     * @param dataRegionCfg Data region configuration.
+     * @param ioRegistry IO registry.
+     * @param sizes Segments sizes.
+     * @param pmPageMgr Page store manager.
+     * @param changeTracker Callback invoked to track changes in pages.
+     */
+    public PageMemoryImpl(
+            DirectMemoryProvider directMemoryProvider,
+            PageMemoryDataRegionConfiguration dataRegionCfg,
+            PageIoRegistry ioRegistry,
+            long[] sizes,
+            PageReadWriteManager pmPageMgr,
+            @Nullable PageChangeTracker changeTracker
+    ) {
+        this.directMemoryProvider = directMemoryProvider;
+        this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+        this.ioRegistry = ioRegistry;
+        this.sizes = sizes;
+        this.pmPageMgr = pmPageMgr;
+        this.changeTracker = changeTracker;
+
+        int pageSize = this.dataRegionCfg.pageSize();
+
+        sysPageSize = pageSize + PAGE_OVERHEAD;
+
+        rwLock = new OffheapReadWriteLock(128);
+
+        String replacementMode = this.dataRegionCfg.replacementMode();
+
+        switch (replacementMode) {
+            case RANDOM_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new RandomLruPageReplacementPolicyFactory();
+
+                break;
+            case SEGMENTED_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new SegmentedLruPageReplacementPolicyFactory();
+
+                break;
+            case CLOCK_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new ClockPageReplacementPolicyFactory();
+
+                break;
+            default:
+                throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (started) {
+                return;
+            }
+
+            started = true;
+
+            directMemoryProvider.initialize(sizes);
+
+            List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
+
+            while (true) {
+                DirectMemoryRegion reg = directMemoryProvider.nextRegion();
+
+                if (reg == null) {
+                    break;
+                }
+
+                regions.add(reg);
+            }
+
+            int regs = regions.size();
+
+            Segment[] segments = new Segment[regs - 1];
+
+            DirectMemoryRegion cpReg = regions.get(regs - 1);
+
+            long checkpointBuf = cpReg.size();
+
+            long totalAllocated = 0;
+            int pages = 0;
+            long totalTblSize = 0;
+            long totalReplSize = 0;
+
+            for (int i = 0; i < regs - 1; i++) {
+                assert i < segments.length;
+
+                DirectMemoryRegion reg = regions.get(i);
+
+                totalAllocated += reg.size();
+
+                segments[i] = new Segment(i, regions.get(i));
+
+                pages += segments[i].pages();
+                totalTblSize += segments[i].tableSize();
+                totalReplSize += segments[i].replacementSize();
+            }
+
+            this.segments = segments;
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Started page memory [memoryAllocated=" + readableSize(totalAllocated, false)
+                        + ", pages=" + pages
+                        + ", tableSize=" + readableSize(totalTblSize, false)
+                        + ", replacementSize=" + readableSize(totalReplSize, false)
+                        + ", checkpointBuffer=" + readableSize(checkpointBuf, false)
+                        + ']');
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean deallocate) throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (!started) {
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping page memory.");
+            }
+
+            if (segments != null) {
+                for (Segment seg : segments) {
+                    seg.close();
+                }
+            }
+
+            started = false;
+
+            directMemoryProvider.shutdown(deallocate);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void releasePage(int grpId, long pageId, long page) {
+        assert started;
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            seg.releasePage(page);
+        } finally {
+            seg.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return readLock(page, pageId, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+        assert started;
+
+        int tag = force ? -1 : tag(pageId);
+
+        boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+        if (!locked) {
+            return 0;
+        }
+
+        if (touch) {
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+        }
+
+        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+        return absPtr + PAGE_OVERHEAD;
+    }
+
+    private long readLock(long absPtr, long pageId, boolean force) {
+        return readLock(absPtr, pageId, force, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void readUnlock(int grpId, long pageId, long page) {
+        assert started;
+
+        readUnlockPage(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return writeLock(grpId, pageId, page, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page, boolean restore) {
+        assert started;
+
+        return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long tryWriteLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag) {
+        assert started;
+
+        writeUnlock(grpId, pageId, page, dirtyFlag, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
+        assert started;
+
+        writeUnlockPage(page, new FullPageId(pageId, grpId), dirtyFlag, restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isDirty(int grpId, long pageId, long page) {
+        assert started;
+
+        return isDirty(page);
+    }
+
+    /**
+     * Returns {@code true} if page is dirty.
+     *
+     * @param absPtr Absolute pointer.
+     */
+    boolean isDirty(long absPtr) {
+        return dirty(absPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
+
+        assert started;
+
+        long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
+
+        assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+
+        // We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
+        // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+        // because there is no crc inside them.
+        Segment seg = segment(grpId, pageId);
+
+        seg.writeLock().lock();
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            if (relPtr == OUTDATED_REL_PTR) {
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.removePageForReplacement();
+            }
+
+            long absPtr = seg.absolute(relPtr);
+
+            setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+            fullPageId(absPtr, fullId);
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+            rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+            assert !isAcquired(absPtr) :
+                    "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr)
+                            + ", absPtr=" + hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
+
+            setDirty(fullId, absPtr, true, true);
+
+            seg.pageReplacementPolicy.onMiss(relPtr);
+
+            seg.loadedPages.put(grpId, effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId));
+        } catch (IgniteOutOfMemoryException oom) {
+            IgniteOutOfMemoryException e = new IgniteOutOfMemoryException("Out of memory in data region ["
+                    + "name=" + dataRegionCfg.name()
+                    + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+                    + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+                    + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+                    + "  ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+                    + "  ^-- Enable eviction or expiration policies"
+            );
+
+            e.initCause(oom);
+
+            throw e;
+        } finally {
+            seg.writeLock().unlock();
+        }
+
+        return pageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer pageBuffer(long pageAddr) {
+        return wrapPointer(pageAddr, pageSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean freePage(int grpId, long pageId) {
+        assert false : "Free page should be never called directly when persistence is enabled.";
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long partitionMetaPageId(int grpId, int partId) {
+        assert started;
+
+        return pageId(partId, FLAG_DATA, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        assert started;
+
+        return acquirePage(grpId, pageId, statHolder, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false, pageAllocated);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder, boolean restore) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, statHolder, restore, null);
+    }
+
+    private long acquirePage(
+            int grpId,
+            long pageId,
+            IoStatisticsHolder statHolder,
+            boolean restore,
+            @Nullable AtomicBoolean pageAllocated
+    ) throws IgniteInternalCheckedException {
+        assert started;
+
+        int partId = partitionId(pageId);
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    INVALID_REL_PTR
+            );
+
+            // The page is loaded to the memory.
+            if (relPtr != INVALID_REL_PTR) {
+                long absPtr = seg.absolute(relPtr);
+
+                seg.acquirePage(absPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+
+                return absPtr;
+            }
+        } finally {
+            seg.readLock().unlock();
+        }
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        seg.writeLock().lock();
+
+        long lockedPageAbsPtr = -1;
+        boolean readPageFromStore = false;
+
+        try {
+            // Double-check.
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    fullId.effectivePageId(),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            long absPtr;
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+
+                if (pageAllocated != null) {
+                    pageAllocated.set(true);
+                }
+
+                if (relPtr == INVALID_REL_PTR) {
+                    relPtr = seg.removePageForReplacement();
+                }
+
+                absPtr = seg.absolute(relPtr);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                // We can clear dirty flag after the page has been allocated.
+                setDirty(fullId, absPtr, false, false);
+
+                seg.pageReplacementPolicy.onMiss(relPtr);
+
+                seg.loadedPages.put(
+                        grpId,
+                        fullId.effectivePageId(),
+                        relPtr,
+                        seg.partGeneration(grpId, partId)
+                );
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                if (!restore) {
+                    readPageFromStore = true;
+                } else {
+                    setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+                    // Must init page ID in order to ensure RWLock tag consistency.
+                    setPageId(pageAddr, pageId);
+                }
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                if (readPageFromStore) {
+                    boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+                    assert locked : "Page ID " + fullId + " expected to be locked";
+
+                    lockedPageAbsPtr = absPtr;
+                }
+            } else if (relPtr == OUTDATED_REL_PTR) {
+                assert pageIndex(pageId) == 0 : fullId;
+
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                absPtr = seg.absolute(relPtr);
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                setMemory(pageAddr, pageSize(), (byte) 0);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+                setPageId(pageAddr, pageId);
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+                seg.pageReplacementPolicy.onMiss(relPtr);
+            } else {
+                absPtr = seg.absolute(relPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+            }
+
+            seg.acquirePage(absPtr);
+
+            if (!readPageFromStore) {
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+            }
+
+            return absPtr;
+        } catch (IgniteOutOfMemoryException oom) {
+            throw oom;
+        } finally {
+            seg.writeLock().unlock();
+
+            if (readPageFromStore) {
+                assert lockedPageAbsPtr != -1 : "Page is expected to have a valid address [pageId=" + fullId
+                        + ", lockedPageAbsPtr=" + hexLong(lockedPageAbsPtr) + ']';
+
+                assert isPageWriteLocked(lockedPageAbsPtr) : "Page is expected to be locked: [pageId=" + fullId + "]";
+
+                long pageAddr = lockedPageAbsPtr + PAGE_OVERHEAD;
+
+                ByteBuffer buf = wrapPointer(pageAddr, pageSize());
+
+                long actualPageId = 0;
+
+                try {
+                    pmPageMgr.read(grpId, pageId, buf, false);
+
+                    statHolder.trackPhysicalAndLogicalRead(pageAddr);
+
+                    actualPageId = getPageId(buf);
+                } finally {
+                    rwLock.writeUnlock(lockedPageAbsPtr + PAGE_LOCK_OFFSET, actualPageId == 0 ? TAG_LOCK_ALWAYS : tag(actualPageId));
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int pageSize() {
+        return sysPageSize - PAGE_OVERHEAD;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int systemPageSize() {
+        return sysPageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int realPageSize(int grpId) {
+        return pageSize();

Review comment:
       Already in **PageMemory#realPageSize**




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] tkalkirill commented on pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#issuecomment-1055543899


   @ibessonov Please review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov merged pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
ibessonov merged pull request #667:
URL: https://github.com/apache/ignite-3/pull/667


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #667: IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #667:
URL: https://github.com/apache/ignite-3/pull/667#discussion_r816798762



##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Pair of group ID with partition ID. Immutable, comparable class, may be used as key in maps.
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+    /** Index for super(meta) page. There is always such page for iterated cache partition. */
+    private static final int METAPAGE_IDX = 0;
+
+    /** Group ID. */
+    private final int grpId;
+
+    /** Partition ID. */
+    private final int partId;
+
+    /**
+     * Creates group-partition tuple.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public GroupPartitionId(final int grpId, final int partId) {
+        this.grpId = grpId;
+        this.partId = partId;
+    }
+
+    /**
+     * Returns flag to be used for partition.
+     *
+     * @param partId Partition ID.
+     */
+    public static byte getFlagByPartId(final int partId) {
+        return partId == INDEX_PARTITION ? FLAG_AUX : FLAG_DATA;
+    }
+
+    /**
+     * Return group ID.
+     */
+    public int getGroupId() {
+        return grpId;
+    }
+
+    /**
+     * Return partition ID.
+     */
+    public int getPartitionId() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(GroupPartitionId.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        GroupPartitionId key = (GroupPartitionId) o;
+
+        if (grpId != key.grpId) {
+            return false;
+        }
+
+        return partId == key.partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        int result = grpId;
+
+        result = 31 * result + partId;
+
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int compareTo(GroupPartitionId o) {
+        if (getGroupId() < o.getGroupId()) {
+            return -1;
+        }
+
+        if (getGroupId() > o.getGroupId()) {
+            return 1;
+        }
+
+        if (getPartitionId() < o.getPartitionId()) {
+            return -1;
+        }
+
+        if (getPartitionId() > o.getPartitionId()) {
+            return 1;
+        }
+
+        return 0;
+    }
+
+    /**
+     * Returns page ID (64 bits) constructed from partition ID and given index.
+     *
+     * @param pageIdx Page Index, monotonically growing number within each partition.
+     */
+    private long createPageId(final int pageIdx) {
+        final int partId = getPartitionId();
+
+        return pageId(partId, getFlagByPartId(partId), pageIdx);

Review comment:
       This looks weird as well

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Pair of group ID with partition ID. Immutable, comparable class, may be used as key in maps.
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+    /** Index for super(meta) page. There is always such page for iterated cache partition. */
+    private static final int METAPAGE_IDX = 0;
+
+    /** Group ID. */
+    private final int grpId;
+
+    /** Partition ID. */
+    private final int partId;
+
+    /**
+     * Creates group-partition tuple.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public GroupPartitionId(final int grpId, final int partId) {
+        this.grpId = grpId;
+        this.partId = partId;
+    }
+
+    /**
+     * Returns flag to be used for partition.
+     *
+     * @param partId Partition ID.
+     */
+    public static byte getFlagByPartId(final int partId) {
+        return partId == INDEX_PARTITION ? FLAG_AUX : FLAG_DATA;
+    }
+
+    /**
+     * Return group ID.
+     */
+    public int getGroupId() {
+        return grpId;
+    }
+
+    /**
+     * Return partition ID.
+     */
+    public int getPartitionId() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(GroupPartitionId.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        GroupPartitionId key = (GroupPartitionId) o;
+
+        if (grpId != key.grpId) {
+            return false;
+        }
+
+        return partId == key.partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+        int result = grpId;
+
+        result = 31 * result + partId;
+
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int compareTo(GroupPartitionId o) {
+        if (getGroupId() < o.getGroupId()) {
+            return -1;
+        }
+
+        if (getGroupId() > o.getGroupId()) {
+            return 1;
+        }
+
+        if (getPartitionId() < o.getPartitionId()) {
+            return -1;
+        }
+
+        if (getPartitionId() > o.getPartitionId()) {
+            return 1;
+        }
+
+        return 0;
+    }
+
+    /**
+     * Returns page ID (64 bits) constructed from partition ID and given index.
+     *
+     * @param pageIdx Page Index, monotonically growing number within each partition.
+     */
+    private long createPageId(final int pageIdx) {
+        final int partId = getPartitionId();
+
+        return pageId(partId, getFlagByPartId(partId), pageIdx);
+    }
+
+    /**
+     * Returns Full page ID. For index 0 will return super-page of next partition
+     *
+     * @param pageIdx Page Index, monotonically growing number within each partition
+     * @return FullPageId consists of cache ID (32 bits) and page ID (64 bits).
+     */
+    private FullPageId createFullPageId(final int pageIdx) {
+        return new FullPageId(createPageId(pageIdx), getGroupId());

Review comment:
       If it's all just for meta page id, then we could just hardcode these expressions into "createFirstPageFullId", if it's even used

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
##########
@@ -0,0 +1,1602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.lang.System.lineSeparator;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getCrc;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.tag;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.IgniteUtils.hash;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+------------------------------------------------------+
+ * |8 bytes |         PAGE_SIZE + PAGE_OVERHEAD - 8 bytes          |
+ * +--------+------------------------------------------------------+
+ * |Next ptr|                      Page data                       |
+ * +--------+------------------------------------------------------+
+ * </pre>
+ *
+ * <p>When page is allocated and is in use:
+ * <pre>
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * |     8 bytes      |8 bytes |8 bytes |4 b |4 b |8 bytes |8 bytes |       PAGE_SIZE      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | Marker/Timestamp |Rel ptr |Page ID |C ID|PIN | LOCK   |TMP BUF |       Page data      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending on whether the page is
+ * in use or not.
+ */
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PageMemoryImpl implements PageMemoryEx {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
+
+    /** Full relative pointer mask. */
+    public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Invalid relative pointer value. */
+    static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+    /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
+    static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+
+    /** Page lock offset. */
+    public static final int PAGE_LOCK_OFFSET = 32;
+
+    /** 8b Marker/timestamp 8b Relative pointer 8b Page ID 4b Group ID 4b Pin count 8b Lock 8b Temporary buffer. */
+    public static final int PAGE_OVERHEAD = 48;
+
+    /** Try again tag. */
+    public static final int TRY_AGAIN_TAG = -1;
+
+    /** Data region configuration view. */
+    private final PageMemoryDataRegionView dataRegionCfg;
+
+    /** Page IO registry. */
+    private final PageIoRegistry ioRegistry;
+
+    /** Page manager. */
+    private final PageReadWriteManager pmPageMgr;
+
+    /** Page size. */
+    private final int sysPageSize;
+
+    /** Page replacement policy factory. */
+    private final PageReplacementPolicyFactory pageReplacementPolicyFactory;
+
+    /** Direct memory allocator. */
+    private final DirectMemoryProvider directMemoryProvider;
+
+    /** Segments array. */
+    private volatile Segment[] segments;
+
+    /** Lock for segments changes. */
+    private final Object segmentsLock = new Object();
+
+    /** Offheap read write lock instance. */
+    private final OffheapReadWriteLock rwLock;
+
+    /** Callback invoked to track changes in pages. {@code Null} if page tracking functionality is disabled. */
+    @Nullable
+    private final PageChangeTracker changeTracker;
+
+    /** Field updater. */
+    private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
+    /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
+    private volatile int pageReplacementWarned;
+
+    /** Segments sizes. */
+    private final long[] sizes;
+
+    /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
+    private volatile boolean started;
+
+    /**
+     * Constructor.
+     *
+     * @param directMemoryProvider Memory allocator to use.
+     * @param dataRegionCfg Data region configuration.
+     * @param ioRegistry IO registry.
+     * @param sizes Segments sizes.
+     * @param pmPageMgr Page store manager.
+     * @param changeTracker Callback invoked to track changes in pages.
+     */
+    public PageMemoryImpl(
+            DirectMemoryProvider directMemoryProvider,
+            PageMemoryDataRegionConfiguration dataRegionCfg,
+            PageIoRegistry ioRegistry,
+            long[] sizes,
+            PageReadWriteManager pmPageMgr,
+            @Nullable PageChangeTracker changeTracker
+    ) {
+        this.directMemoryProvider = directMemoryProvider;
+        this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+        this.ioRegistry = ioRegistry;
+        this.sizes = sizes;
+        this.pmPageMgr = pmPageMgr;
+        this.changeTracker = changeTracker;
+
+        int pageSize = this.dataRegionCfg.pageSize();
+
+        sysPageSize = pageSize + PAGE_OVERHEAD;
+
+        rwLock = new OffheapReadWriteLock(128);
+
+        String replacementMode = this.dataRegionCfg.replacementMode();
+
+        switch (replacementMode) {
+            case RANDOM_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new RandomLruPageReplacementPolicyFactory();
+
+                break;
+            case SEGMENTED_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new SegmentedLruPageReplacementPolicyFactory();
+
+                break;
+            case CLOCK_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new ClockPageReplacementPolicyFactory();
+
+                break;
+            default:
+                throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (started) {
+                return;
+            }
+
+            started = true;
+
+            directMemoryProvider.initialize(sizes);
+
+            List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
+
+            while (true) {
+                DirectMemoryRegion reg = directMemoryProvider.nextRegion();
+
+                if (reg == null) {
+                    break;
+                }
+
+                regions.add(reg);
+            }
+
+            int regs = regions.size();
+
+            Segment[] segments = new Segment[regs - 1];
+
+            DirectMemoryRegion cpReg = regions.get(regs - 1);
+
+            long checkpointBuf = cpReg.size();
+
+            long totalAllocated = 0;
+            int pages = 0;
+            long totalTblSize = 0;
+            long totalReplSize = 0;
+
+            for (int i = 0; i < regs - 1; i++) {
+                assert i < segments.length;
+
+                DirectMemoryRegion reg = regions.get(i);
+
+                totalAllocated += reg.size();
+
+                segments[i] = new Segment(i, regions.get(i));
+
+                pages += segments[i].pages();
+                totalTblSize += segments[i].tableSize();
+                totalReplSize += segments[i].replacementSize();
+            }
+
+            this.segments = segments;
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Started page memory [memoryAllocated=" + readableSize(totalAllocated, false)
+                        + ", pages=" + pages
+                        + ", tableSize=" + readableSize(totalTblSize, false)
+                        + ", replacementSize=" + readableSize(totalReplSize, false)
+                        + ", checkpointBuffer=" + readableSize(checkpointBuf, false)
+                        + ']');
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean deallocate) throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (!started) {
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping page memory.");
+            }
+
+            if (segments != null) {
+                for (Segment seg : segments) {
+                    seg.close();
+                }
+            }
+
+            started = false;
+
+            directMemoryProvider.shutdown(deallocate);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void releasePage(int grpId, long pageId, long page) {
+        assert started;
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            seg.releasePage(page);
+        } finally {
+            seg.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return readLock(page, pageId, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+        assert started;
+
+        int tag = force ? -1 : tag(pageId);
+
+        boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+        if (!locked) {
+            return 0;
+        }
+
+        if (touch) {
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+        }
+
+        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+        return absPtr + PAGE_OVERHEAD;
+    }
+
+    private long readLock(long absPtr, long pageId, boolean force) {
+        return readLock(absPtr, pageId, force, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void readUnlock(int grpId, long pageId, long page) {
+        assert started;
+
+        readUnlockPage(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return writeLock(grpId, pageId, page, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page, boolean restore) {
+        assert started;
+
+        return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long tryWriteLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag) {
+        assert started;
+
+        writeUnlock(grpId, pageId, page, dirtyFlag, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
+        assert started;
+
+        writeUnlockPage(page, new FullPageId(pageId, grpId), dirtyFlag, restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isDirty(int grpId, long pageId, long page) {
+        assert started;
+
+        return isDirty(page);
+    }
+
+    /**
+     * Returns {@code true} if page is dirty.
+     *
+     * @param absPtr Absolute pointer.
+     */
+    boolean isDirty(long absPtr) {
+        return dirty(absPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
+
+        assert started;
+
+        long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
+
+        assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+
+        // We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
+        // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+        // because there is no crc inside them.
+        Segment seg = segment(grpId, pageId);
+
+        seg.writeLock().lock();
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            if (relPtr == OUTDATED_REL_PTR) {
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.removePageForReplacement();
+            }
+
+            long absPtr = seg.absolute(relPtr);
+
+            setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+            fullPageId(absPtr, fullId);
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+            rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+            assert !isAcquired(absPtr) :
+                    "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr)
+                            + ", absPtr=" + hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
+
+            setDirty(fullId, absPtr, true, true);
+
+            seg.pageReplacementPolicy.onMiss(relPtr);
+
+            seg.loadedPages.put(grpId, effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId));
+        } catch (IgniteOutOfMemoryException oom) {
+            IgniteOutOfMemoryException e = new IgniteOutOfMemoryException("Out of memory in data region ["
+                    + "name=" + dataRegionCfg.name()
+                    + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+                    + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+                    + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+                    + "  ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+                    + "  ^-- Enable eviction or expiration policies"
+            );
+
+            e.initCause(oom);
+
+            throw e;
+        } finally {
+            seg.writeLock().unlock();
+        }
+
+        return pageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer pageBuffer(long pageAddr) {
+        return wrapPointer(pageAddr, pageSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean freePage(int grpId, long pageId) {
+        assert false : "Free page should be never called directly when persistence is enabled.";
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long partitionMetaPageId(int grpId, int partId) {
+        assert started;
+
+        return pageId(partId, FLAG_DATA, 0);

Review comment:
       I don't think that meta page has "data" flag. Does it? Please check

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
##########
@@ -0,0 +1,1602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.lang.System.lineSeparator;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getCrc;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.tag;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.IgniteUtils.hash;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+------------------------------------------------------+
+ * |8 bytes |         PAGE_SIZE + PAGE_OVERHEAD - 8 bytes          |
+ * +--------+------------------------------------------------------+
+ * |Next ptr|                      Page data                       |
+ * +--------+------------------------------------------------------+
+ * </pre>
+ *
+ * <p>When page is allocated and is in use:
+ * <pre>
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * |     8 bytes      |8 bytes |8 bytes |4 b |4 b |8 bytes |8 bytes |       PAGE_SIZE      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | Marker/Timestamp |Rel ptr |Page ID |C ID|PIN | LOCK   |TMP BUF |       Page data      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending on whether the page is
+ * in use or not.
+ */
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PageMemoryImpl implements PageMemoryEx {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
+
+    /** Full relative pointer mask. */
+    public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Invalid relative pointer value. */
+    static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+    /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
+    static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+
+    /** Page lock offset. */
+    public static final int PAGE_LOCK_OFFSET = 32;
+
+    /** 8b Marker/timestamp 8b Relative pointer 8b Page ID 4b Group ID 4b Pin count 8b Lock 8b Temporary buffer. */
+    public static final int PAGE_OVERHEAD = 48;
+
+    /** Try again tag. */
+    public static final int TRY_AGAIN_TAG = -1;
+
+    /** Data region configuration view. */
+    private final PageMemoryDataRegionView dataRegionCfg;
+
+    /** Page IO registry. */
+    private final PageIoRegistry ioRegistry;
+
+    /** Page manager. */
+    private final PageReadWriteManager pmPageMgr;
+
+    /** Page size. */
+    private final int sysPageSize;
+
+    /** Page replacement policy factory. */
+    private final PageReplacementPolicyFactory pageReplacementPolicyFactory;
+
+    /** Direct memory allocator. */
+    private final DirectMemoryProvider directMemoryProvider;
+
+    /** Segments array. */
+    private volatile Segment[] segments;
+
+    /** Lock for segments changes. */
+    private final Object segmentsLock = new Object();
+
+    /** Offheap read write lock instance. */
+    private final OffheapReadWriteLock rwLock;
+
+    /** Callback invoked to track changes in pages. {@code Null} if page tracking functionality is disabled. */
+    @Nullable
+    private final PageChangeTracker changeTracker;
+
+    /** Field updater. */
+    private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
+    /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
+    private volatile int pageReplacementWarned;
+
+    /** Segments sizes. */
+    private final long[] sizes;
+
+    /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
+    private volatile boolean started;
+
+    /**
+     * Constructor.
+     *
+     * @param directMemoryProvider Memory allocator to use.
+     * @param dataRegionCfg Data region configuration.
+     * @param ioRegistry IO registry.
+     * @param sizes Segments sizes.
+     * @param pmPageMgr Page store manager.
+     * @param changeTracker Callback invoked to track changes in pages.
+     */
+    public PageMemoryImpl(
+            DirectMemoryProvider directMemoryProvider,
+            PageMemoryDataRegionConfiguration dataRegionCfg,
+            PageIoRegistry ioRegistry,
+            long[] sizes,
+            PageReadWriteManager pmPageMgr,
+            @Nullable PageChangeTracker changeTracker
+    ) {
+        this.directMemoryProvider = directMemoryProvider;
+        this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+        this.ioRegistry = ioRegistry;
+        this.sizes = sizes;
+        this.pmPageMgr = pmPageMgr;
+        this.changeTracker = changeTracker;
+
+        int pageSize = this.dataRegionCfg.pageSize();
+
+        sysPageSize = pageSize + PAGE_OVERHEAD;
+
+        rwLock = new OffheapReadWriteLock(128);
+
+        String replacementMode = this.dataRegionCfg.replacementMode();
+
+        switch (replacementMode) {
+            case RANDOM_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new RandomLruPageReplacementPolicyFactory();
+
+                break;
+            case SEGMENTED_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new SegmentedLruPageReplacementPolicyFactory();
+
+                break;
+            case CLOCK_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new ClockPageReplacementPolicyFactory();
+
+                break;
+            default:
+                throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (started) {
+                return;
+            }
+
+            started = true;
+
+            directMemoryProvider.initialize(sizes);
+
+            List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
+
+            while (true) {
+                DirectMemoryRegion reg = directMemoryProvider.nextRegion();
+
+                if (reg == null) {
+                    break;
+                }
+
+                regions.add(reg);
+            }
+
+            int regs = regions.size();
+
+            Segment[] segments = new Segment[regs - 1];
+
+            DirectMemoryRegion cpReg = regions.get(regs - 1);
+
+            long checkpointBuf = cpReg.size();
+
+            long totalAllocated = 0;
+            int pages = 0;
+            long totalTblSize = 0;
+            long totalReplSize = 0;
+
+            for (int i = 0; i < regs - 1; i++) {
+                assert i < segments.length;
+
+                DirectMemoryRegion reg = regions.get(i);
+
+                totalAllocated += reg.size();
+
+                segments[i] = new Segment(i, regions.get(i));
+
+                pages += segments[i].pages();
+                totalTblSize += segments[i].tableSize();
+                totalReplSize += segments[i].replacementSize();
+            }
+
+            this.segments = segments;
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Started page memory [memoryAllocated=" + readableSize(totalAllocated, false)
+                        + ", pages=" + pages
+                        + ", tableSize=" + readableSize(totalTblSize, false)
+                        + ", replacementSize=" + readableSize(totalReplSize, false)
+                        + ", checkpointBuffer=" + readableSize(checkpointBuf, false)
+                        + ']');
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean deallocate) throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (!started) {
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping page memory.");
+            }
+
+            if (segments != null) {
+                for (Segment seg : segments) {
+                    seg.close();
+                }
+            }
+
+            started = false;
+
+            directMemoryProvider.shutdown(deallocate);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void releasePage(int grpId, long pageId, long page) {
+        assert started;
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            seg.releasePage(page);
+        } finally {
+            seg.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return readLock(page, pageId, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+        assert started;
+
+        int tag = force ? -1 : tag(pageId);
+
+        boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+        if (!locked) {
+            return 0;
+        }
+
+        if (touch) {
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+        }
+
+        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+        return absPtr + PAGE_OVERHEAD;
+    }
+
+    private long readLock(long absPtr, long pageId, boolean force) {
+        return readLock(absPtr, pageId, force, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void readUnlock(int grpId, long pageId, long page) {
+        assert started;
+
+        readUnlockPage(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return writeLock(grpId, pageId, page, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page, boolean restore) {
+        assert started;
+
+        return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long tryWriteLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag) {
+        assert started;
+
+        writeUnlock(grpId, pageId, page, dirtyFlag, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
+        assert started;
+
+        writeUnlockPage(page, new FullPageId(pageId, grpId), dirtyFlag, restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isDirty(int grpId, long pageId, long page) {
+        assert started;
+
+        return isDirty(page);
+    }
+
+    /**
+     * Returns {@code true} if page is dirty.
+     *
+     * @param absPtr Absolute pointer.
+     */
+    boolean isDirty(long absPtr) {
+        return dirty(absPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
+
+        assert started;
+
+        long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
+
+        assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+
+        // We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
+        // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+        // because there is no crc inside them.
+        Segment seg = segment(grpId, pageId);
+
+        seg.writeLock().lock();
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            if (relPtr == OUTDATED_REL_PTR) {
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.removePageForReplacement();
+            }
+
+            long absPtr = seg.absolute(relPtr);
+
+            setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+            fullPageId(absPtr, fullId);
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+            rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+            assert !isAcquired(absPtr) :
+                    "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr)
+                            + ", absPtr=" + hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
+
+            setDirty(fullId, absPtr, true, true);
+
+            seg.pageReplacementPolicy.onMiss(relPtr);
+
+            seg.loadedPages.put(grpId, effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId));
+        } catch (IgniteOutOfMemoryException oom) {
+            IgniteOutOfMemoryException e = new IgniteOutOfMemoryException("Out of memory in data region ["
+                    + "name=" + dataRegionCfg.name()
+                    + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+                    + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+                    + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+                    + "  ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+                    + "  ^-- Enable eviction or expiration policies"
+            );
+
+            e.initCause(oom);
+
+            throw e;
+        } finally {
+            seg.writeLock().unlock();
+        }
+
+        return pageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer pageBuffer(long pageAddr) {
+        return wrapPointer(pageAddr, pageSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean freePage(int grpId, long pageId) {
+        assert false : "Free page should be never called directly when persistence is enabled.";
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long partitionMetaPageId(int grpId, int partId) {
+        assert started;
+
+        return pageId(partId, FLAG_DATA, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        assert started;
+
+        return acquirePage(grpId, pageId, statHolder, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false, pageAllocated);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder, boolean restore) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, statHolder, restore, null);
+    }
+
+    private long acquirePage(
+            int grpId,
+            long pageId,
+            IoStatisticsHolder statHolder,
+            boolean restore,
+            @Nullable AtomicBoolean pageAllocated
+    ) throws IgniteInternalCheckedException {
+        assert started;
+
+        int partId = partitionId(pageId);
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    INVALID_REL_PTR
+            );
+
+            // The page is loaded to the memory.
+            if (relPtr != INVALID_REL_PTR) {
+                long absPtr = seg.absolute(relPtr);
+
+                seg.acquirePage(absPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+
+                return absPtr;
+            }
+        } finally {
+            seg.readLock().unlock();
+        }
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        seg.writeLock().lock();
+
+        long lockedPageAbsPtr = -1;
+        boolean readPageFromStore = false;
+
+        try {
+            // Double-check.
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    fullId.effectivePageId(),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            long absPtr;
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+
+                if (pageAllocated != null) {
+                    pageAllocated.set(true);
+                }
+
+                if (relPtr == INVALID_REL_PTR) {
+                    relPtr = seg.removePageForReplacement();
+                }
+
+                absPtr = seg.absolute(relPtr);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                // We can clear dirty flag after the page has been allocated.
+                setDirty(fullId, absPtr, false, false);
+
+                seg.pageReplacementPolicy.onMiss(relPtr);
+
+                seg.loadedPages.put(
+                        grpId,
+                        fullId.effectivePageId(),
+                        relPtr,
+                        seg.partGeneration(grpId, partId)
+                );
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                if (!restore) {
+                    readPageFromStore = true;
+                } else {
+                    setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+                    // Must init page ID in order to ensure RWLock tag consistency.
+                    setPageId(pageAddr, pageId);
+                }
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                if (readPageFromStore) {
+                    boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+                    assert locked : "Page ID " + fullId + " expected to be locked";
+
+                    lockedPageAbsPtr = absPtr;
+                }
+            } else if (relPtr == OUTDATED_REL_PTR) {
+                assert pageIndex(pageId) == 0 : fullId;
+
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                absPtr = seg.absolute(relPtr);
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                setMemory(pageAddr, pageSize(), (byte) 0);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+                setPageId(pageAddr, pageId);
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+                seg.pageReplacementPolicy.onMiss(relPtr);
+            } else {
+                absPtr = seg.absolute(relPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+            }
+
+            seg.acquirePage(absPtr);
+
+            if (!readPageFromStore) {
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+            }
+
+            return absPtr;
+        } catch (IgniteOutOfMemoryException oom) {
+            throw oom;
+        } finally {
+            seg.writeLock().unlock();
+
+            if (readPageFromStore) {
+                assert lockedPageAbsPtr != -1 : "Page is expected to have a valid address [pageId=" + fullId
+                        + ", lockedPageAbsPtr=" + hexLong(lockedPageAbsPtr) + ']';
+
+                assert isPageWriteLocked(lockedPageAbsPtr) : "Page is expected to be locked: [pageId=" + fullId + "]";
+
+                long pageAddr = lockedPageAbsPtr + PAGE_OVERHEAD;
+
+                ByteBuffer buf = wrapPointer(pageAddr, pageSize());
+
+                long actualPageId = 0;
+
+                try {
+                    pmPageMgr.read(grpId, pageId, buf, false);
+
+                    statHolder.trackPhysicalAndLogicalRead(pageAddr);
+
+                    actualPageId = getPageId(buf);
+                } finally {
+                    rwLock.writeUnlock(lockedPageAbsPtr + PAGE_LOCK_OFFSET, actualPageId == 0 ? TAG_LOCK_ALWAYS : tag(actualPageId));
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int pageSize() {
+        return sysPageSize - PAGE_OVERHEAD;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int systemPageSize() {
+        return sysPageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int realPageSize(int grpId) {
+        return pageSize();

Review comment:
       Please add TODO for the future

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.putLong;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+
+import java.util.function.LongUnaryOperator;
+
+/**
+ * Clock page replacement algorithm implementation.
+ */
+public class ClockPageReplacementFlags {
+    /** Total pages count. */
+    private final int pagesCnt;
+
+    /** Index of the next candidate ("hand"). */
+    private int curIdx;
+
+    /** Pointer to memory region to store page hit flags. */
+    private final long flagsPtr;
+
+    /**
+     * Constructor.
+     *
+     * @param totalPagesCnt Total pages count.
+     * @param memPtr Pointer to memory region.
+     */
+    ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
+        pagesCnt = totalPagesCnt;
+        flagsPtr = memPtr;
+
+        setMemory(flagsPtr, (totalPagesCnt + 7) >> 3, (byte) 0);

Review comment:
       Please replace such places with GridUnsafe.zeroMemory(...)

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Pair of group ID with partition ID. Immutable, comparable class, may be used as key in maps.
+ */
+public class GroupPartitionId implements Comparable<GroupPartitionId> {
+    /** Index for super(meta) page. There is always such page for iterated cache partition. */
+    private static final int METAPAGE_IDX = 0;
+
+    /** Group ID. */
+    private final int grpId;
+
+    /** Partition ID. */
+    private final int partId;
+
+    /**
+     * Creates group-partition tuple.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public GroupPartitionId(final int grpId, final int partId) {
+        this.grpId = grpId;
+        this.partId = partId;
+    }
+
+    /**
+     * Returns flag to be used for partition.
+     *
+     * @param partId Partition ID.
+     */
+    public static byte getFlagByPartId(final int partId) {

Review comment:
       I don't like this method, is it even used?

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryEx.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Page memory with some persistence related additions.
+ */
+//TODO IGNITE-16350 Improve javadoc in this class.
+public interface PageMemoryEx extends PageMemory {

Review comment:
       Is there a way to remove this interface completely? We should think about it later

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -252,6 +252,23 @@ public static String toHexString(byte[] arr, int maxLen) {
         return sb.toString().toUpperCase();
     }
 
+    /**
+     * Returns hex representation of memory region.
+     *
+     * @param addr Pointer in memory.
+     * @param len How much byte to read (should divide 8).

Review comment:
       What? :)
   Please remove that "should divide 8", it makes no sense.

##########
File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
##########
@@ -0,0 +1,1602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import static java.lang.System.lineSeparator;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getCrc;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getPageId;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.tag;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
+import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
+import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
+import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.IgniteUtils.hash;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+------------------------------------------------------+
+ * |8 bytes |         PAGE_SIZE + PAGE_OVERHEAD - 8 bytes          |
+ * +--------+------------------------------------------------------+
+ * |Next ptr|                      Page data                       |
+ * +--------+------------------------------------------------------+
+ * </pre>
+ *
+ * <p>When page is allocated and is in use:
+ * <pre>
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * |     8 bytes      |8 bytes |8 bytes |4 b |4 b |8 bytes |8 bytes |       PAGE_SIZE      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * | Marker/Timestamp |Rel ptr |Page ID |C ID|PIN | LOCK   |TMP BUF |       Page data      |
+ * +------------------+--------+--------+----+----+--------+--------+----------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending on whether the page is
+ * in use or not.
+ */
+@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
+public class PageMemoryImpl implements PageMemoryEx {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
+
+    /** Full relative pointer mask. */
+    public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Invalid relative pointer value. */
+    static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+    /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
+    static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+
+    /** Page lock offset. */
+    public static final int PAGE_LOCK_OFFSET = 32;
+
+    /** 8b Marker/timestamp 8b Relative pointer 8b Page ID 4b Group ID 4b Pin count 8b Lock 8b Temporary buffer. */
+    public static final int PAGE_OVERHEAD = 48;
+
+    /** Try again tag. */
+    public static final int TRY_AGAIN_TAG = -1;
+
+    /** Data region configuration view. */
+    private final PageMemoryDataRegionView dataRegionCfg;
+
+    /** Page IO registry. */
+    private final PageIoRegistry ioRegistry;
+
+    /** Page manager. */
+    private final PageReadWriteManager pmPageMgr;
+
+    /** Page size. */
+    private final int sysPageSize;
+
+    /** Page replacement policy factory. */
+    private final PageReplacementPolicyFactory pageReplacementPolicyFactory;
+
+    /** Direct memory allocator. */
+    private final DirectMemoryProvider directMemoryProvider;
+
+    /** Segments array. */
+    private volatile Segment[] segments;
+
+    /** Lock for segments changes. */
+    private final Object segmentsLock = new Object();
+
+    /** Offheap read write lock instance. */
+    private final OffheapReadWriteLock rwLock;
+
+    /** Callback invoked to track changes in pages. {@code Null} if page tracking functionality is disabled. */
+    @Nullable
+    private final PageChangeTracker changeTracker;
+
+    /** Field updater. */
+    private static final AtomicIntegerFieldUpdater<PageMemoryImpl> pageReplacementWarnedFieldUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(PageMemoryImpl.class, "pageReplacementWarned");
+
+    /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
+    private volatile int pageReplacementWarned;
+
+    /** Segments sizes. */
+    private final long[] sizes;
+
+    /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
+    private volatile boolean started;
+
+    /**
+     * Constructor.
+     *
+     * @param directMemoryProvider Memory allocator to use.
+     * @param dataRegionCfg Data region configuration.
+     * @param ioRegistry IO registry.
+     * @param sizes Segments sizes.
+     * @param pmPageMgr Page store manager.
+     * @param changeTracker Callback invoked to track changes in pages.
+     */
+    public PageMemoryImpl(
+            DirectMemoryProvider directMemoryProvider,
+            PageMemoryDataRegionConfiguration dataRegionCfg,
+            PageIoRegistry ioRegistry,
+            long[] sizes,
+            PageReadWriteManager pmPageMgr,
+            @Nullable PageChangeTracker changeTracker
+    ) {
+        this.directMemoryProvider = directMemoryProvider;
+        this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+        this.ioRegistry = ioRegistry;
+        this.sizes = sizes;
+        this.pmPageMgr = pmPageMgr;
+        this.changeTracker = changeTracker;
+
+        int pageSize = this.dataRegionCfg.pageSize();
+
+        sysPageSize = pageSize + PAGE_OVERHEAD;
+
+        rwLock = new OffheapReadWriteLock(128);
+
+        String replacementMode = this.dataRegionCfg.replacementMode();
+
+        switch (replacementMode) {
+            case RANDOM_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new RandomLruPageReplacementPolicyFactory();
+
+                break;
+            case SEGMENTED_LRU_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new SegmentedLruPageReplacementPolicyFactory();
+
+                break;
+            case CLOCK_REPLACEMENT_MODE:
+                pageReplacementPolicyFactory = new ClockPageReplacementPolicyFactory();
+
+                break;
+            default:
+                throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (started) {
+                return;
+            }
+
+            started = true;
+
+            directMemoryProvider.initialize(sizes);
+
+            List<DirectMemoryRegion> regions = new ArrayList<>(sizes.length);
+
+            while (true) {
+                DirectMemoryRegion reg = directMemoryProvider.nextRegion();
+
+                if (reg == null) {
+                    break;
+                }
+
+                regions.add(reg);
+            }
+
+            int regs = regions.size();
+
+            Segment[] segments = new Segment[regs - 1];
+
+            DirectMemoryRegion cpReg = regions.get(regs - 1);
+
+            long checkpointBuf = cpReg.size();
+
+            long totalAllocated = 0;
+            int pages = 0;
+            long totalTblSize = 0;
+            long totalReplSize = 0;
+
+            for (int i = 0; i < regs - 1; i++) {
+                assert i < segments.length;
+
+                DirectMemoryRegion reg = regions.get(i);
+
+                totalAllocated += reg.size();
+
+                segments[i] = new Segment(i, regions.get(i));
+
+                pages += segments[i].pages();
+                totalTblSize += segments[i].tableSize();
+                totalReplSize += segments[i].replacementSize();
+            }
+
+            this.segments = segments;
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Started page memory [memoryAllocated=" + readableSize(totalAllocated, false)
+                        + ", pages=" + pages
+                        + ", tableSize=" + readableSize(totalTblSize, false)
+                        + ", replacementSize=" + readableSize(totalReplSize, false)
+                        + ", checkpointBuffer=" + readableSize(checkpointBuf, false)
+                        + ']');
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean deallocate) throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (!started) {
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping page memory.");
+            }
+
+            if (segments != null) {
+                for (Segment seg : segments) {
+                    seg.close();
+                }
+            }
+
+            started = false;
+
+            directMemoryProvider.shutdown(deallocate);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void releasePage(int grpId, long pageId, long page) {
+        assert started;
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            seg.releasePage(page);
+        } finally {
+            seg.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return readLock(page, pageId, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+        assert started;
+
+        int tag = force ? -1 : tag(pageId);
+
+        boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
+
+        if (!locked) {
+            return 0;
+        }
+
+        if (touch) {
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+        }
+
+        assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+        return absPtr + PAGE_OVERHEAD;
+    }
+
+    private long readLock(long absPtr, long pageId, boolean force) {
+        return readLock(absPtr, pageId, force, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void readUnlock(int grpId, long pageId, long page) {
+        assert started;
+
+        readUnlockPage(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return writeLock(grpId, pageId, page, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long writeLock(int grpId, long pageId, long page, boolean restore) {
+        assert started;
+
+        return writeLockPage(page, new FullPageId(pageId, grpId), !restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long tryWriteLock(int grpId, long pageId, long page) {
+        assert started;
+
+        return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag) {
+        assert started;
+
+        writeUnlock(grpId, pageId, page, dirtyFlag, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(int grpId, long pageId, long page, boolean dirtyFlag, boolean restore) {
+        assert started;
+
+        writeUnlockPage(page, new FullPageId(pageId, grpId), dirtyFlag, restore);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isDirty(int grpId, long pageId, long page) {
+        assert started;
+
+        return isDirty(page);
+    }
+
+    /**
+     * Returns {@code true} if page is dirty.
+     *
+     * @param absPtr Absolute pointer.
+     */
+    boolean isDirty(long absPtr) {
+        return dirty(absPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION && flags == FLAG_AUX : "flags = " + flags + ", partId = " + partId;
+
+        assert started;
+
+        long pageId = pmPageMgr.allocatePage(grpId, partId, flags);
+
+        assert pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
+
+        // We need to allocate page in memory for marking it dirty to save it in the next checkpoint.
+        // Otherwise it is possible that on file will be empty page which will be saved at snapshot and read with error
+        // because there is no crc inside them.
+        Segment seg = segment(grpId, pageId);
+
+        seg.writeLock().lock();
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            if (relPtr == OUTDATED_REL_PTR) {
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+            }
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.removePageForReplacement();
+            }
+
+            long absPtr = seg.absolute(relPtr);
+
+            setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+            fullPageId(absPtr, fullId);
+            writeTimestamp(absPtr, coarseCurrentTimeMillis());
+            rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
+
+            assert !isAcquired(absPtr) :
+                    "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr)
+                            + ", absPtr=" + hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']';
+
+            setDirty(fullId, absPtr, true, true);
+
+            seg.pageReplacementPolicy.onMiss(relPtr);
+
+            seg.loadedPages.put(grpId, effectivePageId(pageId), relPtr, seg.partGeneration(grpId, partId));
+        } catch (IgniteOutOfMemoryException oom) {
+            IgniteOutOfMemoryException e = new IgniteOutOfMemoryException("Out of memory in data region ["
+                    + "name=" + dataRegionCfg.name()
+                    + ", initSize=" + readableSize(dataRegionCfg.initSize(), false)
+                    + ", maxSize=" + readableSize(dataRegionCfg.maxSize(), false)
+                    + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:" + lineSeparator()
+                    + "  ^-- Increase maximum off-heap memory size (PageMemoryDataRegionConfiguration.maxSize)" + lineSeparator()
+                    + "  ^-- Enable eviction or expiration policies"
+            );
+
+            e.initCause(oom);
+
+            throw e;
+        } finally {
+            seg.writeLock().unlock();
+        }
+
+        return pageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ByteBuffer pageBuffer(long pageAddr) {
+        return wrapPointer(pageAddr, pageSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean freePage(int grpId, long pageId) {
+        assert false : "Free page should be never called directly when persistence is enabled.";
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long partitionMetaPageId(int grpId, int partId) {
+        assert started;
+
+        return pageId(partId, FLAG_DATA, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
+        assert started;
+
+        return acquirePage(grpId, pageId, statHolder, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, AtomicBoolean pageAllocated) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, false, pageAllocated);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder, boolean restore) throws IgniteInternalCheckedException {
+        return acquirePage(grpId, pageId, statHolder, restore, null);
+    }
+
+    private long acquirePage(
+            int grpId,
+            long pageId,
+            IoStatisticsHolder statHolder,
+            boolean restore,
+            @Nullable AtomicBoolean pageAllocated
+    ) throws IgniteInternalCheckedException {
+        assert started;
+
+        int partId = partitionId(pageId);
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    effectivePageId(pageId),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    INVALID_REL_PTR
+            );
+
+            // The page is loaded to the memory.
+            if (relPtr != INVALID_REL_PTR) {
+                long absPtr = seg.absolute(relPtr);
+
+                seg.acquirePage(absPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+
+                return absPtr;
+            }
+        } finally {
+            seg.readLock().unlock();
+        }
+
+        FullPageId fullId = new FullPageId(pageId, grpId);
+
+        seg.writeLock().lock();
+
+        long lockedPageAbsPtr = -1;
+        boolean readPageFromStore = false;
+
+        try {
+            // Double-check.
+            long relPtr = seg.loadedPages.get(
+                    grpId,
+                    fullId.effectivePageId(),
+                    seg.partGeneration(grpId, partId),
+                    INVALID_REL_PTR,
+                    OUTDATED_REL_PTR
+            );
+
+            long absPtr;
+
+            if (relPtr == INVALID_REL_PTR) {
+                relPtr = seg.borrowOrAllocateFreePage(pageId);
+
+                if (pageAllocated != null) {
+                    pageAllocated.set(true);
+                }
+
+                if (relPtr == INVALID_REL_PTR) {
+                    relPtr = seg.removePageForReplacement();
+                }
+
+                absPtr = seg.absolute(relPtr);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                // We can clear dirty flag after the page has been allocated.
+                setDirty(fullId, absPtr, false, false);
+
+                seg.pageReplacementPolicy.onMiss(relPtr);
+
+                seg.loadedPages.put(
+                        grpId,
+                        fullId.effectivePageId(),
+                        relPtr,
+                        seg.partGeneration(grpId, partId)
+                );
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                if (!restore) {
+                    readPageFromStore = true;
+                } else {
+                    setMemory(absPtr + PAGE_OVERHEAD, pageSize(), (byte) 0);
+
+                    // Must init page ID in order to ensure RWLock tag consistency.
+                    setPageId(pageAddr, pageId);
+                }
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                if (readPageFromStore) {
+                    boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+                    assert locked : "Page ID " + fullId + " expected to be locked";
+
+                    lockedPageAbsPtr = absPtr;
+                }
+            } else if (relPtr == OUTDATED_REL_PTR) {
+                assert pageIndex(pageId) == 0 : fullId;
+
+                relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
+
+                absPtr = seg.absolute(relPtr);
+
+                long pageAddr = absPtr + PAGE_OVERHEAD;
+
+                setMemory(pageAddr, pageSize(), (byte) 0);
+
+                fullPageId(absPtr, fullId);
+                writeTimestamp(absPtr, coarseCurrentTimeMillis());
+                setPageId(pageAddr, pageId);
+
+                assert !isAcquired(absPtr) :
+                        "Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
+
+                rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
+
+                seg.pageReplacementPolicy.onRemove(relPtr);
+                seg.pageReplacementPolicy.onMiss(relPtr);
+            } else {
+                absPtr = seg.absolute(relPtr);
+
+                seg.pageReplacementPolicy.onHit(relPtr);
+            }
+
+            seg.acquirePage(absPtr);
+
+            if (!readPageFromStore) {
+                statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+            }
+
+            return absPtr;
+        } catch (IgniteOutOfMemoryException oom) {
+            throw oom;

Review comment:
       What is this? Please add a comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org