You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/06/01 15:34:09 UTC

[ignite-3] branch main updated: IGNITE-16933 PageMemory-based MV storage implementation (#814)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d76a6ee4 IGNITE-16933 PageMemory-based MV storage implementation (#814)
1d76a6ee4 is described below

commit 1d76a6ee44fa37304b2c38cc1de677be966a63b0
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Jun 1 19:34:05 2022 +0400

    IGNITE-16933 PageMemory-based MV storage implementation (#814)
---
 .../internal/pagememory/PageMemoryDataRegion.java  |   5 +-
 .../pagememory/datapage/DataPageReader.java        |  97 ++++
 .../datapage/NonFragmentableDataPageReader.java    | 134 +++++
 .../pagememory/datapage/PageMemoryTraversal.java   |  50 ++
 .../datapage/ReadPageMemoryRowValue.java           | 137 ++++++
 .../internal/pagememory/freelist/FreeList.java     |   6 +-
 .../internal/pagememory/io/AbstractDataPageIo.java |  92 +++-
 .../internal/pagememory/io/DataPagePayload.java    |   9 +
 .../ignite/internal/schema/ByteBufferRow.java      |  10 +-
 .../internal/storage/MvPartitionStorage.java       |  13 +-
 .../storage/NoUncommittedVersionException.java}    |  18 +-
 .../storage/AbstractMvPartitionStorageTest.java    | 542 ++++++++++++++++-----
 .../internal/storage/BaseMvStoragesTest.java       |   1 -
 .../storage/basic/TestMvPartitionStorage.java      |   8 +-
 .../storage/basic/TestMvPartitionStorageTest.java  |  16 +-
 .../pagememory/AbstractPageMemoryDataRegion.java   |   6 +-
 .../pagememory/PageMemoryStorageEngine.java        |   3 +-
 .../storage/pagememory/PageMemoryTableStorage.java |  17 +-
 .../internal/storage/pagememory/TableTree.java     |   4 +-
 .../pagememory/VolatilePageMemoryDataRegion.java   | 114 ++++-
 .../pagememory/VolatilePageMemoryTableStorage.java |   2 +-
 .../internal/storage/pagememory/mv/LinkRowId.java} |  35 +-
 .../mv/PageMemoryMvPartitionStorage.java           | 521 ++++++++++++++++++++
 .../pagememory/mv/PageMemoryMvStorageIoModule.java |  45 ++
 .../storage/pagememory/mv/PartitionlessLinks.java  | 134 +++++
 .../pagememory/mv/ReadLatestRowVersion.java        |  95 ++++
 .../pagememory/mv/ReadRowVersionValue.java}        |  25 +-
 .../RowIdIsInvalidForModificationsException.java}  |  19 +-
 .../internal/storage/pagememory/mv/RowVersion.java | 180 +++++++
 .../storage/pagememory/mv/RowVersionFreeList.java  | 153 ++++++
 .../pagememory/mv/ScanVersionChainByTimestamp.java | 120 +++++
 .../internal/storage/pagememory/mv/Timestamps.java |  70 +++
 .../storage/pagememory/mv/TransactionIds.java      |  57 +++
 .../storage/pagememory/mv/VersionChain.java        | 107 ++++
 .../pagememory/mv/VersionChainDataPageReader.java  |  84 ++++
 .../pagememory/mv/VersionChainFreeList.java        | 164 +++++++
 .../storage/pagememory/mv/VersionChainLink.java}   |  34 +-
 .../storage/pagememory/mv/VersionChainTree.java    | 114 +++++
 .../storage/pagememory/mv/io/RowVersionDataIo.java | 106 ++++
 .../pagememory/mv/io/VersionChainDataIo.java       |  95 ++++
 .../pagememory/mv/io/VersionChainInnerIo.java      |  86 ++++
 .../storage/pagememory/mv/io/VersionChainIo.java}  |  21 +-
 .../pagememory/mv/io/VersionChainLeafIo.java       |  86 ++++
 .../pagememory/mv/io/VersionChainMetaIo.java}      |  28 +-
 ...ache.ignite.internal.pagememory.io.PageIoModule |   1 +
 .../mv/PageMemoryMvPartitionStorageTest.java       | 144 ++++++
 .../pagememory/mv/PartitionlessLinksTest.java      |  68 +++
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |   4 +
 .../rocksdb/RocksDbMvPartitionStorageTest.java     |   9 +-
 .../org/apache/ignite/internal/tx/Timestamp.java   |  12 +-
 50 files changed, 3623 insertions(+), 278 deletions(-)

diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
index dbc6e3c2f..46032b6b6 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.pagememory;
 
-import org.jetbrains.annotations.Nullable;
-
 /**
  * Data region based on {@link PageMemory}.
  */
@@ -29,8 +27,7 @@ public interface PageMemoryDataRegion {
     boolean persistent();
 
     /**
-     * Returns page memory, {@code null} if not started.
+     * Returns page memory or throws an exception if not started.
      */
-    @Nullable
     PageMemory pageMemory();
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java
new file mode 100644
index 000000000..9100edfa6
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains logic for reading values from page memory data pages (this includes handling multy-page values).
+ */
+public class DataPageReader {
+    private final PageMemory pageMemory;
+    private final int groupId;
+    private final IoStatisticsHolder statisticsHolder;
+
+    /**
+     * Constructs a new instance.
+     *
+     * @param pageMemory       Page memory that will be used to lock and access memory.
+     * @param groupId          ID of the cache group with which the reader works (all pages must belong to this group)
+     * @param statisticsHolder used to track statistics about operations
+     */
+    public DataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder statisticsHolder) {
+        this.pageMemory = pageMemory;
+        this.groupId = groupId;
+        this.statisticsHolder = statisticsHolder;
+    }
+
+    /**
+     * Traverses page memory starting at the given link. At each step, reads the current data payload and feeds it to the given
+     * {@link PageMemoryTraversal} object which updates itself (usually) and returns next link to continue traversal
+     * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop).
+     *
+     * @param link Row link
+     * @param traversal Object consuming payloads and controlling the traversal.
+     * @param argument Argument that is passed to the traversal.
+     * @throws IgniteInternalCheckedException If failed
+     * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, int)
+     * @see PageMemoryTraversal
+     */
+    public <T> void traverse(final long link, PageMemoryTraversal<T> traversal, @Nullable T argument)
+            throws IgniteInternalCheckedException {
+        assert link != 0;
+
+        int pageSize = pageMemory.realPageSize(groupId);
+
+        long currentLink = link;
+
+        do {
+            final long pageId = pageId(currentLink);
+            final long page = pageMemory.acquirePage(groupId, pageId, statisticsHolder);
+
+            try {
+                long pageAddr = pageMemory.readLock(groupId, pageId, page);
+                assert pageAddr != 0L : currentLink;
+
+                try {
+                    AbstractDataPageIo<?> dataIo = pageMemory.ioRegistry().resolve(pageAddr);
+
+                    int itemId = itemId(currentLink);
+
+                    DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
+
+                    currentLink = traversal.consumePagePayload(currentLink, pageAddr, data, argument);
+                } finally {
+                    pageMemory.readUnlock(groupId, pageId, page);
+                }
+            } finally {
+                pageMemory.releasePage(groupId, pageId, page);
+            }
+        } while (currentLink != PageMemoryTraversal.STOP_TRAVERSAL);
+
+        traversal.finish();
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/NonFragmentableDataPageReader.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/NonFragmentableDataPageReader.java
new file mode 100644
index 000000000..3bf834136
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/NonFragmentableDataPageReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains logic for reading values (that cannot be fragmented, that it, occupy more than one page) from page memory data pages.
+ */
+public abstract class NonFragmentableDataPageReader<T> {
+    private final PageMemory pageMemory;
+    private final int groupId;
+    private final IoStatisticsHolder statisticsHolder;
+
+    /**
+     * Constructs a new instance.
+     *
+     * @param pageMemory       page memory that will be used to lock and access memory
+     * @param groupId          ID of the cache group with which the reader works (all pages must belong to this group)
+     * @param statisticsHolder used to track statistics about operations
+     */
+    public NonFragmentableDataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder statisticsHolder) {
+        this.pageMemory = pageMemory;
+        this.groupId = groupId;
+        this.statisticsHolder = statisticsHolder;
+    }
+
+    /**
+     * Returns a row by link. If it turns out the value was fragmented, an exception is thrown, as this implementation
+     * cannot handle fragmented values.
+     *
+     * @param link Row link
+     * @return row object assembled from the row bytes
+     * @throws IgniteInternalCheckedException If failed
+     * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, int)
+     */
+    @Nullable
+    public T getRowByLink(final long link) throws IgniteInternalCheckedException {
+        assert link != 0;
+
+        int pageSize = pageMemory.realPageSize(groupId);
+
+        final long pageId = pageId(link);
+
+        final long page = pageMemory.acquirePage(groupId, pageId, statisticsHolder);
+
+        try {
+            long pageAddr = pageMemory.readLock(groupId, pageId, page);
+
+            assert pageAddr != 0L : link;
+
+            try {
+                AbstractDataPageIo<?> dataIo = pageMemory.ioRegistry().resolve(pageAddr);
+
+                int itemId = itemId(link);
+
+                if (handleNonExistentItemsGracefully() && !dataIo.itemExists(pageAddr, itemId, pageSize)) {
+                    return rowForNonExistingItem(pageId, itemId);
+                }
+
+                DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
+
+                if (data.hasMoreFragments()) {
+                    throw new IllegalStateException("Value for link " + link + " is fragmented, which is not supported");
+                }
+
+                return readRowFromAddress(link, pageAddr + data.offset());
+
+            } finally {
+                pageMemory.readUnlock(groupId, pageId, page);
+            }
+        } finally {
+            pageMemory.releasePage(groupId, pageId, page);
+        }
+    }
+
+    /**
+     * Reads row object from a contiguous region of memory represented by its starting address. The memory region contains exactly
+     * the bytes representing the row.
+     *
+     * @param link            row link
+     * @param pageAddr        address of memory containing all row bytes
+     * @return row object
+     * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, int)
+     */
+    protected abstract T readRowFromAddress(long link, long pageAddr);
+
+    /**
+     * Returns {@code true} if graceful handling of non-existent items should be enabled.
+     * If it is enabled and a link with a non-existent item ID is provided, {@link #getRowByLink(long)} will return
+     * the result of {@link #rowForNonExistingItem(long, long)} invocation.
+     * If it is disabled, then an assertion will fail.
+     *
+     * @return {@code true} if graceful handling of non-existent items should be enabled
+     */
+    protected boolean handleNonExistentItemsGracefully() {
+        return false;
+    }
+
+    /**
+     * Returns a special row value that can be used as a marker to specify that the given itemId does not actually exist.
+     *
+     * @param pageId ID of the page
+     * @param itemId ID of the item
+     * @return special row value
+     */
+    @Nullable
+    protected T rowForNonExistingItem(long pageId, long itemId) {
+        throw new IllegalStateException("Item is invalid for pageId=" + pageId + ", itemId=" + itemId);
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
new file mode 100644
index 000000000..53fcedd01
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/PageMemoryTraversal.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+
+/**
+ * Controls page memory traversal.
+ *
+ * @see DataPageReader#traverse(long, PageMemoryTraversal)
+ */
+public interface PageMemoryTraversal<T> {
+    /**
+     * Returned to signal that the traversal has to stop.
+     */
+    long STOP_TRAVERSAL = 0;
+
+    /**
+     * Consumes the currently traversed data payload and decides how to proceed.
+     *
+     * @param link     link to the current data payload
+     * @param pageAddr address of the current page
+     * @param payload  represents the row content
+     * @param arg      argument passed to the traversal
+     * @return next row link or {@link #STOP_TRAVERSAL} to stop the traversal
+     */
+    long consumePagePayload(long link, long pageAddr, DataPagePayload payload, T arg);
+
+    /**
+     * Called when the traversal is finishced successfully.
+     */
+    default void finish() {
+        // no-op
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
new file mode 100644
index 000000000..98aad6a8a
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.datapage;
+
+import java.util.Objects;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reads full data payload value (as byte array) from page memory. Supports fragmented values occupying more than one slot.
+ *
+ * <p>This works for the cases when the following conditions are satisfied:
+ * 1. Row data starts with a fixed-length header, which is followed by the 'value'; the 'value' ends the row data
+ * 2. Row data header contains 'value' size as an int at a fixed offset known beforehand
+ * 3. The beginning of the header, including the 'value' size, is always stored in the first slot (i.e.
+ * {@link Storable#headerSize()} is enough to include 'value' size
+ */
+public abstract class ReadPageMemoryRowValue implements PageMemoryTraversal<Void> {
+    /**
+     * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink);
+     * then it's {@code false} (when we found the version we need and we read its value).
+     */
+    private boolean readingFirstSlot = true;
+
+    private int valueSize;
+    /**
+     * Used to collect all the bytes of the target version value.
+     */
+    private byte @Nullable [] allValueBytes;
+    /**
+     * Number of bytes written to {@link #allValueBytes}.
+     */
+    private int transferredBytes = 0;
+
+    @Override
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Void ignoredArg) {
+        if (readingFirstSlot) {
+            readingFirstSlot = false;
+            return readFullyOrStartReadingFragmented(pageAddr, payload);
+        } else {
+            // We are continuing reading a fragmented row.
+            return readNextFragment(pageAddr, payload);
+        }
+    }
+
+    private long readFullyOrStartReadingFragmented(long pageAddr, DataPagePayload payload) {
+        valueSize = readValueSize(pageAddr, payload);
+
+        if (!payload.hasMoreFragments()) {
+            return readFully(pageAddr, payload);
+        } else {
+            allValueBytes = new byte[valueSize];
+            transferredBytes = 0;
+
+            readValueFragmentToArray(pageAddr, payload, valueOffsetInFirstSlot());
+
+            return payload.nextLink();
+        }
+    }
+
+    private int readValueSize(long pageAddr, DataPagePayload payload) {
+        return PageUtils.getInt(pageAddr, payload.offset() + valueSizeOffsetInFirstSlot());
+    }
+
+    private long readFully(long pageAddr, DataPagePayload payload) {
+        allValueBytes = PageUtils.getBytes(pageAddr, payload.offset() + valueOffsetInFirstSlot(), valueSize);
+
+        return STOP_TRAVERSAL;
+    }
+
+    private void readValueFragmentToArray(long pageAddr, DataPagePayload payload, int offsetToValue) {
+        PageUtils.getBytes(pageAddr, payload.offset() + offsetToValue, allValueBytes, transferredBytes, payload.payloadSize());
+        transferredBytes += payload.payloadSize();
+    }
+
+    private long readNextFragment(long pageAddr, DataPagePayload payload) {
+        assert allValueBytes != null;
+
+        readValueFragmentToArray(pageAddr, payload, 0);
+
+        if (payload.hasMoreFragments()) {
+            return payload.nextLink();
+        } else {
+            return STOP_TRAVERSAL;
+        }
+    }
+
+    /**
+     * Returns a byte array containing all the data of the page memory row in question. Should only be called after
+     * the traversal has stopped.
+     *
+     * @return a byte array containing all the data of the page memory row in question
+     */
+    public byte[] result() {
+        return Objects.requireNonNull(allValueBytes);
+    }
+
+    /**
+     * Memory offset into first slot at which the 'value' size is stored (as int).
+     *
+     * @return offset into first slot at which the 'value' size is stored (as int)
+     */
+    protected abstract int valueSizeOffsetInFirstSlot();
+
+    /**
+     * Memory offset into first slot at which the 'value' starts.
+     *
+     * @return offset into first slot at which the 'value' starts
+     */
+    protected abstract int valueOffsetInFirstSlot();
+
+    /**
+     * Resets the object to make it ready for use.
+     */
+    public void reset() {
+        readingFirstSlot = true;
+        allValueBytes = null;
+        transferredBytes = 0;
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java
index de6664222..5c84fa367 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/FreeList.java
@@ -47,7 +47,11 @@ public interface FreeList<T extends Storable> {
     void insertDataRows(Collection<T> rows, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException;
 
     /**
-     * Updates a row by link.
+     * Makes an in-place update of a row identified by the link.
+     * This has a couple of restrictions:
+     * 1. The size of the payload must not change, otherwise the page will be broken (and next insertion will fail due
+     * to assertion failure).
+     * 2. The row cannot be fragmented. If it is, this will return {@code false} without doing anything.
      *
      * @param link Row link.
      * @param row New row data.
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java
index 1b20779d5..c63fac03e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/AbstractDataPageIo.java
@@ -189,6 +189,11 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
     /** Minimum page overhead in bytes. */
     public static final int MIN_DATA_PAGE_OVERHEAD = ITEMS_OFF + ITEM_SIZE + PAYLOAD_LEN_SIZE + LINK_SIZE;
 
+    /** Special index value that signals that the given itemId does not exist. */
+    private static final int NON_EXISTENT_ITEM_INDEX = -1;
+    /** Special offset value that signals that the given itemId does not exist. */
+    private static final int NON_EXISTENT_ITEM_OFFSET = -1;
+
     /**
      * Constructor.
      *
@@ -345,7 +350,7 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
         // possibly a link to the next row fragment.
         freeSpace -= ITEM_SIZE + PAYLOAD_LEN_SIZE + LINK_SIZE;
 
-        return freeSpace < 0 ? 0 : freeSpace;
+        return Math.max(freeSpace, 0);
     }
 
     /**
@@ -483,6 +488,16 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
      * @return Found index of indirect item.
      */
     private int findIndirectItemIndex(long pageAddr, int itemId, int directCnt, int indirectCnt) {
+        int maybeOffset = findIndirectItemIndexOrNotFoundMarker(pageAddr, itemId, directCnt, indirectCnt);
+
+        if (maybeOffset == NON_EXISTENT_ITEM_INDEX) {
+            throw new IllegalStateException("Item not found: " + itemId);
+        }
+
+        return maybeOffset;
+    }
+
+    private int findIndirectItemIndexOrNotFoundMarker(long pageAddr, int itemId, int directCnt, int indirectCnt) {
         int low = directCnt;
         int high = directCnt + indirectCnt - 1;
 
@@ -500,7 +515,7 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
             }
         }
 
-        throw new IllegalStateException("Item not found: " + itemId);
+        return NON_EXISTENT_ITEM_INDEX;
     }
 
     /**
@@ -611,6 +626,7 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
 
         assert directCnt > 0 : "itemId=" + itemId + ", directCnt=" + directCnt + ", page=" + printPageLayout(pageAddr, pageSize);
 
+        final int directItemId;
         if (itemId >= directCnt) { // Need to do indirect lookup.
             int indirectCnt = getIndirectCount(pageAddr);
 
@@ -618,17 +634,65 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
             assert indirectCnt > 0 : "itemId=" + itemId + ", directCnt=" + directCnt + ", indirectCnt=" + indirectCnt
                     + ", page=" + printPageLayout(pageAddr, pageSize);
 
-            int indirectItemIdx = findIndirectItemIndex(pageAddr, itemId, directCnt, indirectCnt);
+            directItemId = resolveDirectItemIdFromIndirectItemId(pageAddr, itemId, directCnt, indirectCnt);
+        } else {
+            directItemId = itemId;
+        }
+
+        return directItemToOffset(getItem(pageAddr, directItemId));
+    }
 
-            assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt;
-            assert indirectItemIdx < directCnt + indirectCnt : indirectItemIdx + " " + directCnt + " " + indirectCnt;
+    private int resolveDirectItemIdFromIndirectItemId(long pageAddr, int itemId, int directCnt, int indirectCnt) {
+        int indirectItemIdx = findIndirectItemIndex(pageAddr, itemId, directCnt, indirectCnt);
 
-            itemId = directItemIndex(getItem(pageAddr, indirectItemIdx));
+        assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt;
+        assert indirectItemIdx < directCnt + indirectCnt : indirectItemIdx + " " + directCnt + " " + indirectCnt;
 
-            assert itemId >= 0 && itemId < directCnt : itemId + " " + directCnt + " " + indirectCnt; // Direct item.
+        int directItemId = directItemIndex(getItem(pageAddr, indirectItemIdx));
+
+        assert directItemId >= 0 && directItemId < directCnt : directItemId + " " + directCnt + " " + indirectCnt; // Direct item.
+        return directItemId;
+    }
+
+    protected int getDataOffsetOrNotFoundMarker(long pageAddr, int itemId, int pageSize) {
+        assert checkIndex(itemId) : itemId;
+
+        int directCnt = getDirectCount(pageAddr);
+
+        if (directCnt <= 0) {
+            return NON_EXISTENT_ITEM_OFFSET;
         }
 
-        return directItemToOffset(getItem(pageAddr, itemId));
+        int indirectCnt = getIndirectCount(pageAddr);
+
+        final int directItemId;
+        if (itemId >= directCnt) { // Need to do indirect lookup.
+            if (indirectCnt <= 0) {
+                return NON_EXISTENT_ITEM_OFFSET;
+            }
+
+            int indirectItemIdx = findIndirectItemIndexOrNotFoundMarker(pageAddr, itemId, directCnt, indirectCnt);
+            if (indirectItemIdx == NON_EXISTENT_ITEM_INDEX) {
+                return NON_EXISTENT_ITEM_OFFSET;
+            }
+
+            directItemId = resolveDirectItemIdFromIndirectItemId(pageAddr, itemId, directCnt, indirectCnt);
+        } else {
+            // it is a direct item
+
+            if (anyIndirectItemPointsAtDirectItem(pageAddr, itemId, directCnt, indirectCnt)) {
+                return NON_EXISTENT_ITEM_OFFSET;
+            }
+
+            directItemId = itemId;
+        }
+
+        return directItemToOffset(getItem(pageAddr, directItemId));
+    }
+
+    private boolean anyIndirectItemPointsAtDirectItem(long pageAddr, int itemId, int directCnt, int indirectCnt) {
+        int maybeIndirectItemIdx = findIndirectItemIndexOrNotFoundMarker(pageAddr, itemId, directCnt, indirectCnt);
+        return maybeIndirectItemIdx != NON_EXISTENT_ITEM_INDEX;
     }
 
     /**
@@ -673,6 +737,18 @@ public abstract class AbstractDataPageIo<T extends Storable> extends PageIo {
                 nextLink);
     }
 
+    /**
+     * Returns {@code true} iff an item with given ID exists in a page at the given address.
+     *
+     * @param pageAddr address where page data begins in memory
+     * @param itemId   item ID to check
+     * @param pageSize size of the page in bytes
+     * @return {@code true} iff an item with given ID exists in a page at the given address
+     */
+    public boolean itemExists(long pageAddr, int itemId, final int pageSize) {
+        return getDataOffsetOrNotFoundMarker(pageAddr, itemId, pageSize) != NON_EXISTENT_ITEM_OFFSET;
+    }
+
     /**
      * Returns Offset to start of actual fragment data.
      *
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java
index e43ce8bcf..b5b66dd2d 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/DataPagePayload.java
@@ -67,6 +67,15 @@ public class DataPagePayload {
         return nextLink;
     }
 
+    /**
+     * Returns {@code true} if this payload links to next fragment.
+     *
+     * @return {@code true} if this payload links to next fragment
+     */
+    public boolean hasMoreFragments() {
+        return nextLink != 0;
+    }
+
     /**
      * Returns payload bytes.
      *
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 0d6ab8c78..7352ccbc3 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -28,6 +28,8 @@ import java.nio.channels.WritableByteChannel;
  * Heap byte buffer-based row.
  */
 public class ByteBufferRow implements BinaryRow {
+    public static ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
     /** Row buffer. */
     private final ByteBuffer buf;
 
@@ -37,7 +39,7 @@ public class ByteBufferRow implements BinaryRow {
      * @param data Array representation of the row.
      */
     public ByteBufferRow(byte[] data) {
-        this(ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN));
+        this(ByteBuffer.wrap(data).order(ORDER));
     }
 
     /**
@@ -46,7 +48,7 @@ public class ByteBufferRow implements BinaryRow {
      * @param buf Buffer representing the row.
      */
     public ByteBufferRow(ByteBuffer buf) {
-        assert buf.order() == ByteOrder.LITTLE_ENDIAN;
+        assert buf.order() == ORDER;
         assert buf.position() == 0;
 
         this.buf = buf;
@@ -90,7 +92,7 @@ public class ByteBufferRow implements BinaryRow {
         int limit = buf.limit();
 
         try {
-            return buf.limit(off + len).position(off).slice().order(ByteOrder.LITTLE_ENDIAN);
+            return buf.limit(off + len).position(off).slice().order(ORDER);
         } finally {
             buf.position(0); // Reset bounds.
             buf.limit(limit);
@@ -105,7 +107,7 @@ public class ByteBufferRow implements BinaryRow {
         int limit = buf.limit();
 
         try {
-            return buf.limit(off + len).position(off).slice().order(ByteOrder.LITTLE_ENDIAN);
+            return buf.limit(off + len).position(off).slice().order(ORDER);
         } finally {
             buf.position(0); // Reset bounds.
             buf.limit(limit);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index a007c4c72..31c9c578f 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -26,6 +26,8 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Multi-versioned partition storage.
+ *
+ * <p>Each MvPartitionStorage instance represents exactly one partition.
  */
 public interface MvPartitionStorage extends AutoCloseable {
     /**
@@ -61,12 +63,17 @@ public interface MvPartitionStorage extends AutoCloseable {
     RowId insert(BinaryRow binaryRow, UUID txId) throws StorageException;
 
     /**
-     * Creates an uncommitted version, assigned to the given transaction id.
+     * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id.
+     * In details:
+     * - if there is no uncommitted version, a new uncommitted version is added
+     * - if there is an uncommitted version belonging to the same transaction, it gets replaced by the given version
+     * - if there is an uncommitted version belonging to a different transaction, {@link TxIdMismatchException} is thrown
      *
      * @param rowId Row id.
      * @param row Binary row to update. Key only row means value removal.
      * @param txId Transaction id.
-     * @return Previour row associated with the row id.
+     * @return Previous uncommitted row version associated with the row id, or {@code null} if no uncommitted version
+     *     exists before this call
      * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
      * @throws StorageException If failed to write data to the storage.
      */
@@ -76,7 +83,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * Aborts a pending update of the ongoing uncommitted transaction. Invoked during rollback.
      *
      * @param rowId Row id.
-     * @return Previour row associated with the row id.
+     * @return Previous uncommitted row version associated with the row id.
      * @throws StorageException If failed to write data to the storage.
      */
     @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/NoUncommittedVersionException.java
similarity index 67%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/NoUncommittedVersionException.java
index dbc6e3c2f..33ceb3447 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/NoUncommittedVersionException.java
@@ -15,22 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage;
 
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
- * Data region based on {@link PageMemory}.
+ * Thrown when an operation is invoked that requires an uncommitted version to exist, but no such version exists in reality.
  */
-public interface PageMemoryDataRegion {
-    /**
-     * Returns {@link true} if the date region is persistent.
-     */
-    boolean persistent();
-
-    /**
-     * Returns page memory, {@code null} if not started.
-     */
-    @Nullable
-    PageMemory pageMemory();
+public class NoUncommittedVersionException extends IgniteInternalException {
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index dbb6afa7d..7418438a5 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.storage;
 
 import static java.util.stream.Collectors.toList;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -31,246 +35,238 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.function.Predicate;
 import java.util.stream.StreamSupport;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
 /**
  * Base test for MV partition storages.
  */
-public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest {
-    /**
-     * Creates a storage instance for testing.
-     */
-    protected abstract MvPartitionStorage partitionStorage();
+public abstract class AbstractMvPartitionStorageTest<S extends MvPartitionStorage> extends BaseMvStoragesTest {
+    protected S storage;
+
+    protected final UUID txId = newTransactionId();
+
+    private final TestKey key = new TestKey(10, "foo");
+    private final TestValue value = new TestValue(20, "bar");
+    protected final BinaryRow binaryRow = binaryRow(key, value);
+    private final TestValue value2 = new TestValue(21, "bar2");
+    protected final BinaryRow binaryRow2 = binaryRow(key, value2);
 
     /**
-     * Tests that reads and scan from empty storage return empty results.
+     * Tests that reads from empty storage return empty results.
      */
     @Test
-    public void testEmpty() throws Exception {
-        MvPartitionStorage pk = partitionStorage();
+    public void testReadsFromEmpty() {
+        RowId rowId = insertAndAbortWrite();
 
-        RowId rowId = pk.insert(binaryRow(new TestKey(1, "1"), new TestValue(1, "1")), UUID.randomUUID());
+        assertEquals(partitionId(), rowId.partitionId());
 
-        pk.abortWrite(rowId);
+        assertNull(storage.read(rowId, newTransactionId()));
+        assertNull(storage.read(rowId, Timestamp.nextVersion()));
+    }
 
-        assertEquals(0, rowId.partitionId());
+    private RowId insertAndAbortWrite() {
+        RowId rowId = storage.insert(binaryRow, txId);
 
-        // Read.
-        assertNull(pk.read(rowId, UUID.randomUUID()));
-        assertNull(pk.read(rowId, Timestamp.nextVersion()));
+        storage.abortWrite(rowId);
 
-        // Scan.
-        assertEquals(List.of(), convert(pk.scan(row -> true, UUID.randomUUID())));
-        assertEquals(List.of(), convert(pk.scan(row -> true, Timestamp.nextVersion())));
+        return rowId;
     }
 
-    /**
-     * Tests basic invariants of {@link MvPartitionStorage#addWrite(BinaryRow, UUID)}.
-     */
     @Test
-    public void testAddWrite() throws Exception {
-        MvPartitionStorage pk = partitionStorage();
+    public void testScanOverEmpty() throws Exception {
+        insertAndAbortWrite();
 
-        TestKey key = new TestKey(10, "foo");
-        TestValue value = new TestValue(20, "bar");
+        assertEquals(List.of(), convert(storage.scan(row -> true, newTransactionId())));
+        assertEquals(List.of(), convert(storage.scan(row -> true, Timestamp.nextVersion())));
+    }
 
-        BinaryRow binaryRow = binaryRow(key, value);
+    protected int partitionId() {
+        return 0;
+    }
 
-        UUID txId = UUID.randomUUID();
+    protected UUID newTransactionId() {
+        return UUID.randomUUID();
+    }
 
-        RowId rowId = pk.insert(binaryRow, txId);
+    /**
+     * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID)}.
+     */
+    @Test
+    public void testAddWrite() {
+        RowId rowId = storage.insert(binaryRow, txId);
 
         // Attempt to write from another transaction.
-        assertThrows(TxIdMismatchException.class, () -> pk.addWrite(rowId, binaryRow, UUID.randomUUID()));
+        assertThrows(TxIdMismatchException.class, () -> storage.addWrite(rowId, binaryRow, newTransactionId()));
 
         // Write from the same transaction.
-        pk.addWrite(rowId, binaryRow, txId);
+        storage.addWrite(rowId, binaryRow, txId);
 
         // Read without timestamp returns uncommited row.
-        assertEquals(value, value(pk.read(rowId, txId)));
+        assertRowMatches(storage.read(rowId, txId), binaryRow);
 
         // Read with wrong transaction id should throw exception.
-        assertThrows(TxIdMismatchException.class, () -> pk.read(rowId, UUID.randomUUID()));
+        assertThrows(TxIdMismatchException.class, () -> storage.read(rowId, newTransactionId()));
 
         // Read with timestamp returns null.
-        assertNull(pk.read(rowId, Timestamp.nextVersion()));
+        assertNull(storage.read(rowId, Timestamp.nextVersion()));
     }
 
     /**
-     * Tests basic invariants of {@link MvPartitionStorage#abortWrite(BinaryRow)}.
+     * Tests basic invariants of {@link MvPartitionStorage#abortWrite(RowId)}.
      */
     @Test
-    public void testAbortWrite() throws Exception {
-        MvPartitionStorage pk = partitionStorage();
-
-        TestKey key = new TestKey(10, "foo");
-        TestValue value = new TestValue(20, "bar");
-
-        UUID txId = UUID.randomUUID();
+    public void testAbortWrite() {
+        RowId rowId = storage.insert(binaryRow(key, value), txId);
 
-        RowId rowId = pk.insert(binaryRow(key, value), txId);
-
-        pk.abortWrite(rowId);
+        storage.abortWrite(rowId);
 
         // Aborted row can't be read.
-        assertNull(pk.read(rowId, txId));
+        assertNull(storage.read(rowId, txId));
     }
 
     /**
-     * Tests basic invariants of {@link MvPartitionStorage#commitWrite(BinaryRow, Timestamp)}.
+     * Tests basic invariants of {@link MvPartitionStorage#commitWrite(RowId, Timestamp)}.
      */
     @Test
-    public void testCommitWrite() throws Exception {
-        MvPartitionStorage pk = partitionStorage();
-
-        TestKey key = new TestKey(10, "foo");
-        TestValue value = new TestValue(20, "bar");
-
-        BinaryRow binaryRow = binaryRow(key, value);
-
-        UUID txId = UUID.randomUUID();
-
-        RowId rowId = pk.insert(binaryRow, txId);
+    public void testCommitWrite() {
+        RowId rowId = storage.insert(binaryRow, txId);
 
         Timestamp tsBefore = Timestamp.nextVersion();
 
         Timestamp tsExact = Timestamp.nextVersion();
-        pk.commitWrite(rowId, tsExact);
+        storage.commitWrite(rowId, tsExact);
 
         Timestamp tsAfter = Timestamp.nextVersion();
 
         // Row is invisible at the time before writing.
-        assertNull(pk.read(rowId, tsBefore));
+        assertNull(storage.read(rowId, tsBefore));
 
         // Row is valid at the time during and after writing.
-        assertEquals(value, value(pk.read(rowId, txId)));
-        assertEquals(value, value(pk.read(rowId, tsExact)));
-        assertEquals(value, value(pk.read(rowId, tsAfter)));
+        assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+        assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
 
         TestValue newValue = new TestValue(30, "duh");
 
-        UUID newTxId = UUID.randomUUID();
+        UUID newTxId = newTransactionId();
 
-        pk.addWrite(rowId, binaryRow(key, newValue), newTxId);
+        BinaryRow newRow = binaryRow(key, newValue);
+        storage.addWrite(rowId, newRow, newTxId);
 
         // Same checks, but now there are two different versions.
-        assertNull(pk.read(rowId, tsBefore));
+        assertNull(storage.read(rowId, tsBefore));
 
-        assertEquals(newValue, value(pk.read(rowId, newTxId)));
+        assertRowMatches(storage.read(rowId, newTxId), newRow);
 
-        assertEquals(value, value(pk.read(rowId, tsExact)));
-        assertEquals(value, value(pk.read(rowId, tsAfter)));
-        assertEquals(value, value(pk.read(rowId, Timestamp.nextVersion())));
+        assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+        assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
+        assertRowMatches(storage.read(rowId, Timestamp.nextVersion()), binaryRow);
 
         // Only latest time behavior changes after commit.
-        pk.commitWrite(rowId, Timestamp.nextVersion());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
 
-        assertEquals(newValue, value(pk.read(rowId, newTxId)));
+        assertRowMatches(storage.read(rowId, newTxId), newRow);
 
-        assertEquals(value, value(pk.read(rowId, tsExact)));
-        assertEquals(value, value(pk.read(rowId, tsAfter)));
+        assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+        assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
 
-        assertEquals(newValue, value(pk.read(rowId, Timestamp.nextVersion())));
+        assertRowMatches(storage.read(rowId, Timestamp.nextVersion()), newRow);
 
         // Remove.
-        UUID removeTxId = UUID.randomUUID();
+        UUID removeTxId = newTransactionId();
 
-        pk.addWrite(rowId, null, removeTxId);
+        storage.addWrite(rowId, null, removeTxId);
 
-        assertNull(pk.read(rowId, tsBefore));
+        assertNull(storage.read(rowId, tsBefore));
 
-        assertNull(pk.read(rowId, removeTxId));
+        assertNull(storage.read(rowId, removeTxId));
 
-        assertEquals(value, value(pk.read(rowId, tsExact)));
-        assertEquals(value, value(pk.read(rowId, tsAfter)));
+        assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+        assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
 
-        assertEquals(newValue, value(pk.read(rowId, Timestamp.nextVersion())));
+        assertRowMatches(storage.read(rowId, Timestamp.nextVersion()), newRow);
 
         // Commit remove.
         Timestamp removeTs = Timestamp.nextVersion();
-        pk.commitWrite(rowId, removeTs);
+        storage.commitWrite(rowId, removeTs);
 
-        assertNull(pk.read(rowId, tsBefore));
+        assertNull(storage.read(rowId, tsBefore));
 
-        assertNull(pk.read(rowId, removeTxId));
-        assertNull(pk.read(rowId, removeTs));
-        assertNull(pk.read(rowId, Timestamp.nextVersion()));
+        assertNull(storage.read(rowId, removeTxId));
+        assertNull(storage.read(rowId, removeTs));
+        assertNull(storage.read(rowId, Timestamp.nextVersion()));
 
-        assertEquals(value, value(pk.read(rowId, tsExact)));
-        assertEquals(value, value(pk.read(rowId, tsAfter)));
+        assertRowMatches(storage.read(rowId, tsExact), binaryRow);
+        assertRowMatches(storage.read(rowId, tsAfter), binaryRow);
     }
 
     /**
-     * Tests basic invariants of {@link MvPartitionStorage#scan(Timestamp)}.
+     * Tests basic invariants of {@link MvPartitionStorage#scan(Predicate, Timestamp)}.
      */
     @Test
     public void testScan() throws Exception {
-        MvPartitionStorage pk = partitionStorage();
-
         TestKey key1 = new TestKey(1, "1");
         TestValue value1 = new TestValue(10, "xxx");
 
         TestKey key2 = new TestKey(2, "2");
         TestValue value2 = new TestValue(20, "yyy");
 
-        UUID txId1 = UUID.randomUUID();
-        RowId rowId1 = pk.insert(binaryRow(key1, value1), txId1);
+        UUID txId1 = newTransactionId();
+        RowId rowId1 = storage.insert(binaryRow(key1, value1), txId1);
 
-        UUID txId2 = UUID.randomUUID();
-        RowId rowId2 = pk.insert(binaryRow(key2, value2), txId2);
+        UUID txId2 = newTransactionId();
+        RowId rowId2 = storage.insert(binaryRow(key2, value2), txId2);
 
         // Scan with and without filters.
-        assertThrows(TxIdMismatchException.class, () -> convert(pk.scan(row -> true, txId1)));
-        assertThrows(TxIdMismatchException.class, () -> convert(pk.scan(row -> true, txId2)));
+        assertThrows(TxIdMismatchException.class, () -> convert(storage.scan(row -> true, txId1)));
+        assertThrows(TxIdMismatchException.class, () -> convert(storage.scan(row -> true, txId2)));
 
-        assertEquals(List.of(value1), convert(pk.scan(row -> key(row).intKey == 1, txId1)));
-        assertEquals(List.of(value2), convert(pk.scan(row -> key(row).intKey == 2, txId2)));
+        assertEquals(List.of(value1), convert(storage.scan(row -> key(row).intKey == 1, txId1)));
+        assertEquals(List.of(value2), convert(storage.scan(row -> key(row).intKey == 2, txId2)));
 
         Timestamp ts1 = Timestamp.nextVersion();
 
         Timestamp ts2 = Timestamp.nextVersion();
-        pk.commitWrite(rowId1, ts2);
+        storage.commitWrite(rowId1, ts2);
 
         Timestamp ts3 = Timestamp.nextVersion();
 
         Timestamp ts4 = Timestamp.nextVersion();
-        pk.commitWrite(rowId2, ts4);
+        storage.commitWrite(rowId2, ts4);
 
         Timestamp ts5 = Timestamp.nextVersion();
 
         // Full scan with various timestamp values.
-        assertEquals(List.of(), convert(pk.scan(row -> true, ts1)));
+        assertEquals(List.of(), convert(storage.scan(row -> true, ts1)));
 
-        assertEquals(List.of(value1), convert(pk.scan(row -> true, ts2)));
-        assertEquals(List.of(value1), convert(pk.scan(row -> true, ts3)));
+        assertEquals(List.of(value1), convert(storage.scan(row -> true, ts2)));
+        assertEquals(List.of(value1), convert(storage.scan(row -> true, ts3)));
 
-        assertEquals(List.of(value1, value2), convert(pk.scan(row -> true, ts4)));
-        assertEquals(List.of(value1, value2), convert(pk.scan(row -> true, ts5)));
+        assertEquals(List.of(value1, value2), convert(storage.scan(row -> true, ts4)));
+        assertEquals(List.of(value1, value2), convert(storage.scan(row -> true, ts5)));
     }
 
     @Test
     public void testScanCursorInvariants() {
-        MvPartitionStorage pk = partitionStorage();
-
         TestValue value1 = new TestValue(10, "xxx");
 
         TestValue value2 = new TestValue(20, "yyy");
 
-        UUID txId = UUID.randomUUID();
-
-        RowId rowId1 = pk.insert(binaryRow(new TestKey(1, "1"), value1), txId);
-        pk.commitWrite(rowId1, Timestamp.nextVersion());
+        RowId rowId1 = storage.insert(binaryRow(new TestKey(1, "1"), value1), txId);
+        storage.commitWrite(rowId1, Timestamp.nextVersion());
 
-        RowId rowId2 = pk.insert(binaryRow(new TestKey(2, "2"), value2), txId);
-        pk.commitWrite(rowId2, Timestamp.nextVersion());
+        RowId rowId2 = storage.insert(binaryRow(new TestKey(2, "2"), value2), txId);
+        storage.commitWrite(rowId2, Timestamp.nextVersion());
 
-        Cursor<BinaryRow> cursor = pk.scan(row -> true, txId);
+        Cursor<BinaryRow> cursor = storage.scan(row -> true, txId);
 
         assertTrue(cursor.hasNext());
+        //noinspection ConstantConditions
         assertTrue(cursor.hasNext());
 
         List<TestValue> res = new ArrayList<>();
@@ -278,11 +274,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         res.add(value(cursor.next()));
 
         assertTrue(cursor.hasNext());
+        //noinspection ConstantConditions
         assertTrue(cursor.hasNext());
 
         res.add(value(cursor.next()));
 
         assertFalse(cursor.hasNext());
+        //noinspection ConstantConditions
         assertFalse(cursor.hasNext());
 
         assertThrows(NoSuchElementException.class, () -> cursor.next());
@@ -298,4 +296,320 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
                     .collect(toList());
         }
     }
+
+    @Test
+    void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        BinaryRow foundRow = storage.read(rowId, txId);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    private void assertRowMatches(@Nullable BinaryRow rowUnderQuestion, BinaryRow expectedRow) {
+        assertThat(rowUnderQuestion, is(notNullValue()));
+        assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
+    }
+
+    @Test
+    void readOfUncommittedRowWithDifferentTransactionIdThrows() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        assertThrows(TxIdMismatchException.class, () -> storage.read(rowId, newTransactionId()));
+    }
+
+    @Test
+    void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
+        RowId rowId = storage.insert(binaryRow, txId);
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow foundRow = storage.read(rowId, newTransactionId());
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void readsUncommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
+        RowId rowId1 = storage.insert(binaryRow, txId);
+        storage.commitWrite(rowId1, Timestamp.nextVersion());
+
+        RowId rowId2 = storage.insert(binaryRow2, txId);
+
+        BinaryRow foundRow = storage.read(rowId2, txId);
+
+        assertRowMatches(foundRow, binaryRow2);
+    }
+
+    @Test
+    void readsCommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
+        RowId rowId1 = storage.insert(binaryRow, txId);
+        storage.commitWrite(rowId1, Timestamp.nextVersion());
+
+        RowId rowId2 = storage.insert(binaryRow2, txId);
+        storage.commitWrite(rowId2, Timestamp.nextVersion());
+
+        BinaryRow foundRow = storage.read(rowId2, txId);
+
+        assertRowMatches(foundRow, binaryRow2);
+    }
+
+    @Test
+    void readByExactlyCommitTimestampFindsRow() {
+        RowId rowId = storage.insert(binaryRow, txId);
+        Timestamp commitTimestamp = Timestamp.nextVersion();
+        storage.commitWrite(rowId, commitTimestamp);
+
+        BinaryRow foundRow = storage.read(rowId, commitTimestamp);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void readByTimestampAfterCommitTimestampFindsRow() {
+        RowId rowId = storage.insert(binaryRow, txId);
+        Timestamp commitTimestamp = Timestamp.nextVersion();
+        storage.commitWrite(rowId, commitTimestamp);
+
+        Timestamp afterCommit = Timestamp.nextVersion();
+        BinaryRow foundRow = storage.read(rowId, afterCommit);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void readByTimestampBeforeFirstVersionCommitTimestampFindsNothing() {
+        Timestamp beforeCommit = Timestamp.nextVersion();
+
+        RowId rowId = storage.insert(binaryRow, txId);
+        Timestamp commitTimestamp = Timestamp.nextVersion();
+        storage.commitWrite(rowId, commitTimestamp);
+
+        BinaryRow foundRow = storage.read(rowId, beforeCommit);
+
+        assertThat(foundRow, is(nullValue()));
+    }
+
+    @Test
+    void readByTimestampOfLastVersionFindsLastVersion() {
+        RowId rowId = storage.insert(binaryRow, txId);
+        Timestamp firstVersionTs = Timestamp.nextVersion();
+        storage.commitWrite(rowId, firstVersionTs);
+
+        storage.addWrite(rowId, binaryRow2, newTransactionId());
+        Timestamp secondVersionTs = Timestamp.nextVersion();
+        storage.commitWrite(rowId, secondVersionTs);
+
+        BinaryRow foundRow = storage.read(rowId, secondVersionTs);
+
+        assertRowMatches(foundRow, binaryRow2);
+    }
+
+    @Test
+    void readByTimestampOfPreviousVersionFindsPreviousVersion() {
+        RowId rowId = storage.insert(binaryRow, txId);
+        Timestamp firstVersionTs = Timestamp.nextVersion();
+        storage.commitWrite(rowId, firstVersionTs);
+
+        storage.addWrite(rowId, binaryRow2, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow foundRow = storage.read(rowId, firstVersionTs);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void readByTimestampBetweenVersionsFindsPreviousVersion() {
+        RowId rowId = storage.insert(binaryRow, txId);
+        Timestamp firstVersionTs = Timestamp.nextVersion();
+        storage.commitWrite(rowId, firstVersionTs);
+
+        Timestamp tsInBetween = Timestamp.nextVersion();
+
+        storage.addWrite(rowId, binaryRow2, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow foundRow = storage.read(rowId, tsInBetween);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void readByTimestampIgnoresUncommittedVersion() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        storage.addWrite(rowId, binaryRow2, newTransactionId());
+
+        Timestamp latestTs = Timestamp.nextVersion();
+        BinaryRow foundRow = storage.read(rowId, latestTs);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void addWriteWithDifferentTxIdThrows() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        assertThrows(TxIdMismatchException.class, () -> storage.addWrite(rowId, binaryRow2, newTransactionId()));
+    }
+
+    @Test
+    void secondUncommittedWriteWithSameTxIdReplacesExistingUncommittedWrite() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        storage.addWrite(rowId, binaryRow2, txId);
+
+        BinaryRow foundRow = storage.read(rowId, txId);
+
+        assertRowMatches(foundRow, binaryRow2);
+    }
+
+    @Test
+    void addWriteReturnsUncommittedVersionIfItExists() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        BinaryRow returnedRow = storage.addWrite(rowId, binaryRow2, txId);
+
+        assertRowMatches(returnedRow, binaryRow);
+    }
+
+    @Test
+    void addWriteReturnsNullIfNoUncommittedVersionExists() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow returnedRow = storage.addWrite(rowId, binaryRow2, txId);
+
+        assertThat(returnedRow, is(nullValue()));
+    }
+
+    @Test
+    void afterRemovalReadWithTxIdFindsNothing() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        storage.addWrite(rowId, null, txId);
+
+        BinaryRow foundRow = storage.read(rowId, txId);
+
+        assertThat(foundRow, is(nullValue()));
+    }
+
+    @Test
+    void afterRemovalReadByLatestTimestampFindsNothing() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        storage.addWrite(rowId, null, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+
+        assertThat(foundRow, is(nullValue()));
+    }
+
+    @Test
+    void afterRemovalPreviousVersionRemainsAccessibleByTimestamp() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        Timestamp firstTimestamp = Timestamp.nextVersion();
+        storage.commitWrite(rowId, firstTimestamp);
+
+        storage.addWrite(rowId, null, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow foundRow = storage.read(rowId, firstTimestamp);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void removalReturnsUncommittedRowVersionIfItExists() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        BinaryRow rowFromRemoval = storage.addWrite(rowId, null, txId);
+
+        assertRowMatches(rowFromRemoval, binaryRow);
+    }
+
+    @Test
+    void removalReturnsNullIfNoUncommittedVersionExists() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow rowFromRemoval = storage.addWrite(rowId, null, newTransactionId());
+
+        assertThat(rowFromRemoval, is(nullValue()));
+    }
+
+    @Test
+    void commitWriteMakesVersionAvailableToReadByTimestamp() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void abortWriteFailsIfNoUncommittedVersionExists() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        assertThrows(NoUncommittedVersionException.class, () -> storage.abortWrite(rowId));
+    }
+
+    @Test
+    void abortWriteRemovesUncommittedVersion() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.commitWrite(rowId, Timestamp.nextVersion());
+
+        storage.addWrite(rowId, binaryRow2, txId);
+
+        storage.abortWrite(rowId);
+
+        BinaryRow foundRow = storage.read(rowId, txId);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void abortOfInsertMakesRowNonExistentForReadByTimestamp() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+
+        storage.abortWrite(rowId);
+
+        BinaryRow foundRow = storage.read(rowId, Timestamp.nextVersion());
+
+        assertThat(foundRow, is(nullValue()));
+    }
+
+    @Test
+    void abortOfInsertMakesRowNonExistentForReadWithTxId() {
+        RowId rowId = insertAndAbortWrite();
+
+        BinaryRow foundRow = storage.read(rowId, txId);
+
+        assertThat(foundRow, is(nullValue()));
+    }
+
+    @Test
+    void abortWriteReturnsTheRemovedVersion() {
+        RowId rowId = storage.insert(binaryRow, txId);
+
+        BinaryRow returnedRow = storage.abortWrite(rowId);
+
+        assertRowMatches(returnedRow, binaryRow);
+    }
+
+    @Test
+    void scanWithTxIdThrowsWhenOtherTransactionHasUncommittedChanges() {
+        storage.insert(binaryRow, txId);
+
+        Cursor<BinaryRow> cursor = storage.scan(k -> true, newTransactionId());
+
+        assertThrows(TxIdMismatchException.class, cursor::next);
+    }
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 57b5fd8c3..461c85687 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -86,7 +86,6 @@ public abstract class BaseMvStoragesTest {
         }
     }
 
-    @Nullable
     protected static TestKey key(BinaryRow binaryRow) {
         try {
             return kvMarshaller.unmarshalKey(new Row(schemaDescriptor, binaryRow));
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
index d66a2c261..933886d01 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.NoUncommittedVersionException;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -116,8 +117,11 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
         BinaryRow[] res = {null};
 
         map.computeIfPresent(rowId, (ignored, versionChain) -> {
-            assert versionChain != null;
-            assert versionChain.begin == null && versionChain.txId != null;
+            if (versionChain.txId == null) {
+                throw new NoUncommittedVersionException();
+            }
+
+            assert versionChain.begin == null;
 
             cleanupIndexesForAbortedRow(versionChain, rowId);
 
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
index 86fc7825f..ba32f5d24 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorageTest.java
@@ -19,18 +19,16 @@ package org.apache.ignite.internal.storage.basic;
 
 import java.util.List;
 import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
 
 /**
  * MV partition storage test implementation for {@link TestMvPartitionStorage} class.
  */
-public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest {
-    /** Test partition storage instance. */
-    private final TestMvPartitionStorage storage = new TestMvPartitionStorage(List.of(), 0);
-
-    /** {@inheritDoc} */
-    @Override
-    protected MvPartitionStorage partitionStorage() {
-        return storage;
+public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest<TestMvPartitionStorage> {
+    /**
+     * Creates new instance.
+     */
+    public TestMvPartitionStorageTest() {
+        storage = new TestMvPartitionStorage(List.of(), 0);
     }
+
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
index 8d535ac84..4e5863076 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.storage.pagememory;
 
+import java.util.Objects;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
 import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfiguration;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Abstract data region for {@link PageMemoryStorageEngine}. Based on a {@link PageMemory}.
@@ -68,7 +68,7 @@ abstract class AbstractPageMemoryDataRegion implements PageMemoryDataRegion, Ign
     /**
      * Returns page memory, {@code null} if not {@link #start started}.
      */
-    public @Nullable PageMemory pageMemory() {
-        return pageMemory;
+    public PageMemory pageMemory() {
+        return Objects.requireNonNull(pageMemory);
     }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
index 746761a12..3270b5941 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageView;
 import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -98,7 +97,7 @@ public class PageMemoryStorageEngine implements StorageEngine {
 
     /** {@inheritDoc} */
     @Override
-    public TableStorage createTable(TableConfiguration tableCfg) {
+    public PageMemoryTableStorage createTable(TableConfiguration tableCfg) {
         TableView tableView = tableCfg.value();
 
         assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
index a109c5524..80d1489da 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
@@ -29,16 +29,18 @@ import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.pagememory.mv.PageMemoryMvPartitionStorage;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 // TODO: IGNITE-16641 Add support for persistent case.
 // TODO: IGNITE-16642 Support indexes.
-abstract class PageMemoryTableStorage implements TableStorage {
+public abstract class PageMemoryTableStorage implements TableStorage {
     protected final AbstractPageMemoryDataRegion dataRegion;
 
     protected final TableConfiguration tableCfg;
@@ -178,4 +180,17 @@ abstract class PageMemoryTableStorage implements TableStorage {
      * @throws StorageException If there is an error while creating the partition storage.
      */
     protected abstract PageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException;
+
+    /**
+     * This API is not yet ready. But we need to test mv storages anyways.
+     */
+    @TestOnly
+    public PageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) {
+        return new PageMemoryMvPartitionStorage(partitionId,
+                tableCfg.value(),
+                dataRegion,
+                ((VolatilePageMemoryDataRegion) dataRegion).versionChainFreeList(),
+                ((VolatilePageMemoryDataRegion) dataRegion).rowVersionFreeList()
+        );
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
index 11303caf8..cdcab829a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
@@ -154,7 +154,7 @@ public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
 
                     DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
 
-                    if (data.nextLink() == 0 && nextLink == link) {
+                    if (!data.hasMoreFragments() && nextLink == link) {
                         // Good luck: we can read the row without fragments.
                         return readFullRow(link, hash, rowData, pageAddr + data.offset());
                     }
@@ -251,7 +251,7 @@ public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
 
                     DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
 
-                    if (data.nextLink() == 0 && nextLink == link) {
+                    if (!data.hasMoreFragments() && nextLink == link) {
                         // Good luck: we can compare the rows without fragments.
                         return compareRowsFull(pageAddr + data.offset(), row);
                     }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
index 36af324e7..6e609357c 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -26,15 +26,23 @@ import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
 import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
 import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.pagememory.mv.RowVersionFreeList;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainFreeList;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
 /**
  * Implementation of {@link AbstractPageMemoryDataRegion} for in-memory case.
  */
-class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
-    private TableFreeList freeList;
+public class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
+    private static final int FREE_LIST_GROUP_ID = 0;
+
+    private TableFreeList tableFreeList;
+
+    private VersionChainFreeList versionChainFreeList;
+    private RowVersionFreeList rowVersionFreeList;
 
     /**
      * Constructor.
@@ -68,23 +76,71 @@ class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
         this.pageMemory = pageMemory;
 
         try {
-            int grpId = 0;
-
-            long metaPageId = pageMemory.allocatePage(grpId, INDEX_PARTITION, FLAG_AUX);
-
-            this.freeList = new TableFreeList(
-                    grpId,
-                    pageMemory,
-                    PageLockListenerNoOp.INSTANCE,
-                    metaPageId,
-                    true,
-                    null,
-                    PageEvictionTrackerNoOp.INSTANCE,
-                    IoStatisticsHolderNoOp.INSTANCE
-            );
+            this.tableFreeList = createTableFreeList(pageMemory);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error creating a TableFreeList", e);
+        }
+
+        try {
+            versionChainFreeList = createVersionChainFreeList(pageMemory, null);
         } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error creating a freeList", e);
+            throw new StorageException("Error creating a VersionChainFreeList", e);
         }
+
+        try {
+            rowVersionFreeList = createRowVersionFreeList(pageMemory, tableFreeList);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error creating a RowVersionFreeList", e);
+        }
+    }
+
+    private TableFreeList createTableFreeList(PageMemory pageMemory) throws IgniteInternalCheckedException {
+        long metaPageId = pageMemory.allocatePage(VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID, INDEX_PARTITION, FLAG_AUX);
+
+        return new TableFreeList(
+                VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID,
+                pageMemory,
+                PageLockListenerNoOp.INSTANCE,
+                metaPageId,
+                true,
+                null,
+                PageEvictionTrackerNoOp.INSTANCE,
+                IoStatisticsHolderNoOp.INSTANCE
+        );
+    }
+
+    private static VersionChainFreeList createVersionChainFreeList(PageMemory pageMemory, ReuseList reuseList)
+            throws IgniteInternalCheckedException {
+        long metaPageId = pageMemory.allocatePage(VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID, INDEX_PARTITION, FLAG_AUX);
+
+        return new VersionChainFreeList(
+                VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID,
+                pageMemory,
+                reuseList,
+                PageLockListenerNoOp.INSTANCE,
+                metaPageId,
+                true,
+                null,
+                PageEvictionTrackerNoOp.INSTANCE,
+                IoStatisticsHolderNoOp.INSTANCE
+        );
+    }
+
+    private static RowVersionFreeList createRowVersionFreeList(PageMemory pageMemory, ReuseList reuseList)
+            throws IgniteInternalCheckedException {
+        long metaPageId = pageMemory.allocatePage(VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID, INDEX_PARTITION, FLAG_AUX);
+
+        return new RowVersionFreeList(
+                VolatilePageMemoryDataRegion.FREE_LIST_GROUP_ID,
+                pageMemory,
+                reuseList,
+                PageLockListenerNoOp.INSTANCE,
+                metaPageId,
+                true,
+                null,
+                PageEvictionTrackerNoOp.INSTANCE,
+                IoStatisticsHolderNoOp.INSTANCE
+        );
     }
 
     /** {@inheritDoc} */
@@ -92,17 +148,31 @@ class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
     public void stop() {
         super.stop();
 
-        if (freeList != null) {
-            freeList.close();
+        if (tableFreeList != null) {
+            tableFreeList.close();
         }
     }
 
     /**
-     * Returns free list.
+     * Returns table free list.
      *
      * <p>NOTE: Free list must be one for the in-memory data region.
      */
-    public TableFreeList freeList() {
-        return freeList;
+    public TableFreeList tableFreeList() {
+        return tableFreeList;
+    }
+
+    /**
+     * Returns version chain free list.
+     */
+    public VersionChainFreeList versionChainFreeList() {
+        return versionChainFreeList;
+    }
+
+    /**
+     * Returns version chain free list.
+     */
+    public RowVersionFreeList rowVersionFreeList() {
+        return rowVersionFreeList;
     }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 8eaf09d32..fab23050a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -37,6 +37,6 @@ class VolatilePageMemoryTableStorage extends PageMemoryTableStorage {
     /** {@inheritDoc} */
     @Override
     protected PageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException {
-        return new PageMemoryPartitionStorage(partId, tableCfg, dataRegion, ((VolatilePageMemoryDataRegion) dataRegion).freeList());
+        return new PageMemoryPartitionStorage(partId, tableCfg, dataRegion, ((VolatilePageMemoryDataRegion) dataRegion).tableFreeList());
     }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LinkRowId.java
similarity index 54%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LinkRowId.java
index dbc6e3c2f..415bedbb2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LinkRowId.java
@@ -15,22 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv;
 
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.storage.RowId;
 
 /**
- * Data region based on {@link PageMemory}.
+ * {@link RowId} implementation which identifies the row data using row link in page-memory.
+ *
+ * @see RowId
+ * @see PageIdUtils#link(long, int)
  */
-public interface PageMemoryDataRegion {
-    /**
-     * Returns {@link true} if the date region is persistent.
-     */
-    boolean persistent();
+public class LinkRowId implements RowId {
+    private final long rowLink;
+
+    public LinkRowId(long rowLink) {
+        this.rowLink = rowLink;
+    }
+
+    @Override
+    public int partitionId() {
+        long pageId = PageIdUtils.pageId(rowLink);
+        return PageIdUtils.partitionId(pageId);
+    }
 
-    /**
-     * Returns page memory, {@code null} if not started.
-     */
-    @Nullable
-    PageMemory pageMemory();
+    long versionChainLink() {
+        return rowLink;
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java
new file mode 100644
index 000000000..32bfa270d
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.NoUncommittedVersionException;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link MvPartitionStorage} using Page Memory.
+ *
+ * @see MvPartitionStorage
+ */
+public class PageMemoryMvPartitionStorage implements MvPartitionStorage {
+    private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
+
+    private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
+
+    private static final Predicate<Timestamp> ALWAYS_LOAD_VALUE = timestamp -> true;
+    private static final Predicate<Timestamp> LOAD_VALUE_WHEN_UNCOMMITTED = RowVersion::isUncommitted;
+
+    private final int partitionId;
+    private final int groupId;
+
+    private final VersionChainFreeList versionChainFreeList;
+    private final VersionChainTree versionChainTree;
+    private final VersionChainDataPageReader versionChainDataPageReader;
+    private final RowVersionFreeList rowVersionFreeList;
+    private final DataPageReader rowVersionDataPageReader;
+
+    private final ThreadLocal<ReadLatestRowVersion> readLatestRowVersionCache = ThreadLocal.withInitial(ReadLatestRowVersion::new);
+    private final ThreadLocal<ScanVersionChainByTimestamp> scanVersionChainByTimestampCache = ThreadLocal.withInitial(
+            ScanVersionChainByTimestamp::new
+    );
+
+    /**
+     * Constructor.
+     */
+    public PageMemoryMvPartitionStorage(
+            int partitionId,
+            TableView tableConfig,
+            PageMemoryDataRegion dataRegion,
+            VersionChainFreeList versionChainFreeList,
+            RowVersionFreeList rowVersionFreeList
+    ) {
+        this.partitionId = partitionId;
+
+        this.versionChainFreeList = versionChainFreeList;
+        this.rowVersionFreeList = rowVersionFreeList;
+
+        groupId = StorageUtils.groupId(tableConfig);
+
+        try {
+            versionChainTree = createVersionChainTree(partitionId, tableConfig, dataRegion, versionChainFreeList);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error occurred while creating the partition storage", e);
+        }
+
+        versionChainDataPageReader = new VersionChainDataPageReader(dataRegion.pageMemory(), groupId, IoStatisticsHolderNoOp.INSTANCE);
+        rowVersionDataPageReader = new DataPageReader(dataRegion.pageMemory(), groupId, IoStatisticsHolderNoOp.INSTANCE);
+    }
+
+    private VersionChainTree createVersionChainTree(
+            int partitionId,
+            TableView tableConfig,
+            PageMemoryDataRegion dataRegion,
+            VersionChainFreeList versionChainFreeList1
+    ) throws IgniteInternalCheckedException {
+        // TODO: IGNITE-16641 It is necessary to do getting the tree root for the persistent case.
+        long metaPageId = dataRegion.pageMemory().allocatePage(groupId, partitionId, FLAG_AUX);
+
+        // TODO: IGNITE-16641 It is necessary to take into account the persistent case.
+        boolean initNew = true;
+
+        return new VersionChainTree(
+                groupId,
+                tableConfig.name(),
+                dataRegion.pageMemory(),
+                PageLockListenerNoOp.INSTANCE,
+                new AtomicLong(),
+                metaPageId,
+                versionChainFreeList1,
+                partitionId,
+                initNew
+        );
+    }
+
+    @Override
+    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+        VersionChain versionChain = findVersionChain(rowId);
+        if (versionChain == null) {
+            return null;
+        }
+
+        return findLatestRowVersion(versionChain, txId, MATCH_ALL);
+    }
+
+    @Override
+    public @Nullable BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException {
+        VersionChain versionChain = findVersionChain(rowId);
+        if (versionChain == null) {
+            return null;
+        }
+
+        return findRowVersionByTimestamp(versionChain, timestamp);
+    }
+
+    @Nullable
+    private VersionChain findVersionChain(RowId rowId) {
+        try {
+            return versionChainDataPageReader.getRowByLink(versionChainLinkFrom(rowId));
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Version chain lookup failed", e);
+        }
+    }
+
+    private long versionChainLinkFrom(RowId rowId) {
+        if (rowId.partitionId() != partitionId) {
+            throw new IllegalArgumentException("I own partition " + partitionId + " but I was given RowId with partition "
+                    + rowId.partitionId());
+        }
+
+        LinkRowId linkRowId = (LinkRowId) rowId;
+
+        return linkRowId.versionChainLink();
+    }
+
+    @Nullable
+    private ByteBufferRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+        RowVersion rowVersion = findLatestRowVersion(versionChain, ALWAYS_LOAD_VALUE);
+        ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
+
+        if (!keyFilter.test(row)) {
+            return null;
+        }
+
+        throwIfChainBelongsToAnotherTx(versionChain, txId);
+
+        return row;
+    }
+
+    private RowVersion findLatestRowVersion(VersionChain versionChain, Predicate<Timestamp> loadValue) {
+        long nextLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(versionChain.headLink(), partitionId);
+
+        ReadLatestRowVersion read = freshReadLatestRowVersion();
+
+        try {
+            rowVersionDataPageReader.traverse(nextLink, read, loadValue);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Row version lookup failed");
+        }
+
+        return read.result();
+    }
+
+    private ReadLatestRowVersion freshReadLatestRowVersion() {
+        ReadLatestRowVersion traversal = readLatestRowVersionCache.get();
+        traversal.reset();
+        return traversal;
+    }
+
+    private void throwIfChainBelongsToAnotherTx(VersionChain versionChain, UUID txId) {
+        if (versionChain.transactionId() != null && !txId.equals(versionChain.transactionId())) {
+            throw new TxIdMismatchException();
+        }
+    }
+
+    @Nullable
+    private ByteBufferRow rowVersionToBinaryRow(RowVersion rowVersion) {
+        if (rowVersion.isTombstone()) {
+            return null;
+        }
+
+        return new ByteBufferRow(rowVersion.value());
+    }
+
+    @Nullable
+    private ByteBufferRow findRowVersionInChain(
+            VersionChain versionChain,
+            @Nullable UUID transactionId,
+            @Nullable Timestamp timestamp,
+            Predicate<BinaryRow> keyFilter
+    ) {
+        assert transactionId != null ^ timestamp != null;
+
+        if (transactionId != null) {
+            return findLatestRowVersion(versionChain, transactionId, keyFilter);
+        } else {
+            ByteBufferRow row = findRowVersionByTimestamp(versionChain, timestamp);
+            return keyFilter.test(row) ? row : null;
+        }
+    }
+
+    @Nullable
+    private ByteBufferRow findRowVersionByTimestamp(VersionChain versionChain, Timestamp timestamp) {
+        long nextRowPartitionlessLink = versionChain.headLink();
+        long nextLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(nextRowPartitionlessLink, partitionId);
+
+        ScanVersionChainByTimestamp scanByTimestamp = freshScanByTimestamp();
+
+        try {
+            rowVersionDataPageReader.traverse(nextLink, scanByTimestamp, timestamp);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot search for a row version", e);
+        }
+
+        return scanByTimestamp.result();
+    }
+
+    private ScanVersionChainByTimestamp freshScanByTimestamp() {
+        ScanVersionChainByTimestamp traversal = scanVersionChainByTimestampCache.get();
+        traversal.reset();
+        return traversal;
+    }
+
+    @Override
+    public LinkRowId insert(BinaryRow row, UUID txId) throws StorageException {
+        RowVersion rowVersion = insertRowVersion(Objects.requireNonNull(row), RowVersion.NULL_LINK);
+
+        VersionChain versionChain = new VersionChain(partitionId, txId, PartitionlessLinks.removePartitionIdFromLink(rowVersion.link()));
+
+        try {
+            versionChainFreeList.insertDataRow(versionChain);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot store a version chain", e);
+        }
+
+        try {
+            versionChainTree.putx(versionChain);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot put a version chain to the tree", e);
+        }
+
+        return new LinkRowId(versionChain.link());
+    }
+
+    private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
+        // TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+        byte[] rowBytes = row == null ? TOMBSTONE_PAYLOAD : row.bytes();
+
+        RowVersion rowVersion = new RowVersion(partitionId, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+        insertRowVersion(rowVersion);
+
+        return rowVersion;
+    }
+
+    private void insertRowVersion(RowVersion rowVersion) {
+        try {
+            rowVersionFreeList.insertDataRow(rowVersion);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot store a row version", e);
+        }
+    }
+
+    @Override
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException, StorageException {
+        VersionChain currentChain = findVersionChainForModification(rowId);
+
+        throwIfChainBelongsToAnotherTx(currentChain, txId);
+
+        RowVersion currentVersion = findLatestRowVersion(currentChain, LOAD_VALUE_WHEN_UNCOMMITTED);
+        RowVersion newVersion = insertRowVersion(row, currentVersion.isUncommitted() ? currentVersion.nextLink() : currentChain.headLink());
+
+        if (currentVersion.isUncommitted()) {
+            // as we replace an uncommitted version with new one, we need to remove old uncommitted version
+            removeRowVersion(currentVersion);
+        }
+
+        VersionChain chainReplacement = new VersionChain(
+                partitionId,
+                txId,
+                PartitionlessLinks.removePartitionIdFromLink(newVersion.link())
+        );
+
+        updateVersionChain(currentChain, chainReplacement);
+
+        if (currentVersion.isUncommitted()) {
+            return rowVersionToBinaryRow(currentVersion);
+        } else {
+            return null;
+        }
+    }
+
+    @NotNull
+    private VersionChain findVersionChainForModification(RowId rowId) {
+        VersionChain currentChain = findVersionChain(rowId);
+        if (currentChain == null) {
+            throw new RowIdIsInvalidForModificationsException();
+        }
+        return currentChain;
+    }
+
+    @Override
+    public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
+        VersionChain currentVersionChain = findVersionChainForModification(rowId);
+
+        if (currentVersionChain.transactionId() == null) {
+            throw new NoUncommittedVersionException();
+        }
+
+        RowVersion currentVersion = findLatestRowVersion(currentVersionChain, ALWAYS_LOAD_VALUE);
+        assert currentVersion.isUncommitted();
+
+        removeRowVersion(currentVersion);
+
+        if (currentVersion.hasNextLink()) {
+            VersionChain versionChainReplacement = VersionChain.withoutTxId(
+                    partitionId,
+                    currentVersionChain.link(),
+                    currentVersion.nextLink()
+            );
+            updateVersionChain(currentVersionChain, versionChainReplacement);
+        } else {
+            // it was the only version, let's remove the chain as well
+            removeVersionChain(currentVersionChain);
+        }
+
+        return rowVersionToBinaryRow(currentVersion);
+    }
+
+    private void removeVersionChain(VersionChain currentVersionChain) {
+        try {
+            versionChainFreeList.removeDataRowByLink(currentVersionChain.link());
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot remove chain version", e);
+        }
+    }
+
+    @Override
+    public void commitWrite(RowId rowId, Timestamp timestamp) throws StorageException {
+        VersionChain currentVersionChain = findVersionChainForModification(rowId);
+        long chainLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(currentVersionChain.headLink(), partitionId);
+
+        assert currentVersionChain.transactionId() != null;
+
+        try {
+            rowVersionFreeList.updateTimestamp(chainLink, timestamp);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot update timestamp", e);
+        }
+
+        try {
+            versionChainFreeList.updateTransactionId(currentVersionChain.link(), null);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot update transaction ID", e);
+        }
+    }
+
+    private void removeRowVersion(RowVersion currentVersion) {
+        try {
+            rowVersionFreeList.removeDataRowByLink(currentVersion.link());
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot update row version");
+        }
+    }
+
+    private void updateVersionChain(VersionChain currentVersionChain, VersionChain versionChainReplacement) {
+        try {
+            boolean updatedInPlace = versionChainFreeList.updateDataRow(currentVersionChain.link(), versionChainReplacement);
+            if (!updatedInPlace) {
+                throw new StorageException("Only in-place updates are supported");
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Cannot update version chain");
+        }
+    }
+
+    @Override
+    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
+        return internalScan(keyFilter, txId, null);
+    }
+
+    @Override
+    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
+        return internalScan(keyFilter, null, timestamp);
+    }
+
+    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable UUID transactionId, @Nullable Timestamp timestamp) {
+        assert transactionId != null ^ timestamp != null;
+
+        IgniteCursor<VersionChain> treeCursor;
+        try {
+            treeCursor = versionChainTree.find(null, null);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Find failed", e);
+        }
+
+        return new ScanCursor(treeCursor, keyFilter, transactionId, timestamp);
+    }
+
+    @Override
+    public void close() {
+        versionChainFreeList.close();
+        versionChainTree.close();
+        rowVersionFreeList.close();
+    }
+
+    private class ScanCursor implements Cursor<BinaryRow> {
+        private final IgniteCursor<VersionChain> treeCursor;
+        private final Predicate<BinaryRow> keyFilter;
+        private final @Nullable UUID transactionId;
+        private final @Nullable Timestamp timestamp;
+
+        private BinaryRow nextRow = null;
+        private boolean iterationExhausted = false;
+
+        public ScanCursor(
+                IgniteCursor<VersionChain> treeCursor,
+                Predicate<BinaryRow> keyFilter,
+                @Nullable UUID transactionId,
+                @Nullable Timestamp timestamp
+        ) {
+            this.treeCursor = treeCursor;
+            this.keyFilter = keyFilter;
+            this.transactionId = transactionId;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextRow != null) {
+                return true;
+            }
+            if (iterationExhausted) {
+                return false;
+            }
+
+            while (true) {
+                boolean positionedToNext = tryAdvanceTreeCursor();
+
+                if (!positionedToNext) {
+                    iterationExhausted = true;
+                    return false;
+                }
+
+                VersionChain chain = getCurrentChainFromTreeCursor();
+                ByteBufferRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
+
+                if (row != null) {
+                    nextRow = row;
+                    return true;
+                }
+            }
+        }
+
+        private boolean tryAdvanceTreeCursor() {
+            try {
+                return treeCursor.next();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error when trying to advance tree cursor", e);
+            }
+        }
+
+        private VersionChain getCurrentChainFromTreeCursor() {
+            try {
+                return treeCursor.get();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to get element from tree cursor", e);
+            }
+        }
+
+        @Override
+        public BinaryRow next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("The cursor is exhausted");
+            }
+
+            assert nextRow != null;
+
+            BinaryRow row = nextRow;
+            nextRow = null;
+
+            return row;
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvStorageIoModule.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvStorageIoModule.java
new file mode 100644
index 000000000..4778fbc9a
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvStorageIoModule.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.io.PageIoModule;
+import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainDataIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainInnerIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainMetaIo;
+
+/**
+ * {@link PageIoModule} related to {@link PageMemoryMvPartitionStorage} implementation.
+ */
+public class PageMemoryMvStorageIoModule implements PageIoModule {
+    /** {@inheritDoc} */
+    @Override
+    public Collection<IoVersions<?>> ioVersions() {
+        return List.of(
+                VersionChainDataIo.VERSIONS,
+                VersionChainMetaIo.VERSIONS,
+                VersionChainInnerIo.VERSIONS,
+                VersionChainLeafIo.VERSIONS,
+                RowVersionDataIo.VERSIONS
+        );
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java
new file mode 100644
index 000000000..9bc142155
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinks.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getShort;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+
+/**
+ * Handling of <em>partitionless links</em>, that is, page memory links from which partition ID is removed.
+ * They are used to spare storage space in cases when we know the partition ID from the context.
+ *
+ * @see PageIdUtils#link(long, int)
+ */
+public class PartitionlessLinks {
+    /**
+     * Number of bytes a partitionless link takes in storage.
+     */
+    public static final int PARTITIONLESS_LINK_SIZE_BYTES = 6;
+
+    /**
+     * Converts a full link to partitionless link by removing the partition ID from it.
+     *
+     * @param link full link
+     * @return partitionless link
+     * @see PageIdUtils#link(long, int)
+     */
+    public static long removePartitionIdFromLink(long link) {
+        return ((link >> PageIdUtils.PART_ID_SIZE) & 0x0000FFFF00000000L)
+                | (link & 0xFFFFFFFFL);
+    }
+
+    /**
+     * Converts a partitionless link to a full link by inserting partition ID at the corresponding position.
+     *
+     * @param partitionlessLink link without partition
+     * @param partitionId       partition ID to insert
+     * @return full link
+     * @see PageIdUtils#link(long, int)
+     */
+    public static long addPartitionIdToPartititionlessLink(long partitionlessLink, int partitionId) {
+        return (partitionlessLink << PageIdUtils.PART_ID_SIZE) & 0xFFFF000000000000L
+                | ((((long) partitionId) << PageIdUtils.PAGE_IDX_SIZE) & 0xFFFF00000000L)
+                | (partitionlessLink & PageIdUtils.PAGE_IDX_MASK);
+    }
+
+    /**
+     * Returns high 16 bits of the 6 bytes representing a partitionless link.
+     *
+     * @param partitionlessLink link without partition info
+     * @return high 16 bits
+     * @see #assemble(short, int)
+     */
+    public static short high16Bits(long partitionlessLink) {
+        return (short) (partitionlessLink >> Integer.SIZE);
+    }
+
+    /**
+     * Returns low 32 bits of the 6 bytes representing a partitionless link.
+     *
+     * @param partitionlessLink link without partition info
+     * @return low 32 bits
+     * @see #assemble(short, int)
+     */
+    public static int low32Bits(long partitionlessLink) {
+        return (int) (partitionlessLink & PageIdUtils.PAGE_IDX_MASK);
+    }
+
+    /**
+     * Assembles a partitionless link from high 16 bits and 32 low bits of its representation.
+     *
+     * @param high16    high 16 bits
+     * @param low32     low 32 bits
+     * @return reconstructed partitionless link
+     * @see #high16Bits(long)
+     * @see #low32Bits(long)
+     */
+    public static long assemble(short high16, int low32) {
+        return ((((long) high16) & 0xFFFF) << 32)
+                | (((long) low32) & 0xFFFFFFFFL);
+    }
+
+    static long readFromMemory(long pageAddr, int offset) {
+        short nextLinkHigh16 = getShort(pageAddr, offset);
+        int nextLinkLow32 = getInt(pageAddr, offset + Short.BYTES);
+
+        return assemble(nextLinkHigh16, nextLinkLow32);
+    }
+
+    static long readFromBuffer(ByteBuffer buffer) {
+        short nextLinkHigh16 = buffer.getShort();
+        int nextLinkLow32 = buffer.getInt();
+
+        return assemble(nextLinkHigh16, nextLinkLow32);
+    }
+
+    /**
+     * Writes a partitionless link to memory: first high 2 bytes, then low 4 bytes.
+     *
+     * @param addr              address in memory where to start
+     * @param partitionlessLink the link to write
+     * @return number of bytes written (equal to {@link #PARTITIONLESS_LINK_SIZE_BYTES})
+     */
+    public static long writeToMemory(long addr, long partitionlessLink) {
+        putShort(addr, 0, PartitionlessLinks.high16Bits(partitionlessLink));
+        addr += Short.BYTES;
+        putInt(addr, 0, PartitionlessLinks.low32Bits(partitionlessLink));
+
+        return PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+    }
+
+    private PartitionlessLinks() {
+        // prevent instantiation
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java
new file mode 100644
index 000000000..363867483
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.nio.ByteBuffer;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Traversal for reading the latest row version. If the version is uncommitted, returns its value; otherwise, does NOT return it.
+ */
+class ReadLatestRowVersion implements PageMemoryTraversal<Predicate<Timestamp>> {
+    private RowVersion result;
+
+    private boolean readingFirstSlot = true;
+
+    private long firstFragmentLink;
+    @Nullable
+    private Timestamp timestamp;
+    private long nextLink;
+
+    private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+
+    @Override
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Predicate<Timestamp> loadValue) {
+        if (readingFirstSlot) {
+            readingFirstSlot = false;
+            return readFullOrInitiateReadFragmented(link, pageAddr, payload, loadValue);
+        } else {
+            return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+        }
+    }
+
+    private long readFullOrInitiateReadFragmented(long link, long pageAddr, DataPagePayload payload, Predicate<Timestamp> loadValue) {
+        firstFragmentLink = link;
+
+        timestamp = Timestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+        nextLink = PartitionlessLinks.readFromMemory(pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+        if (!loadValue.test(timestamp)) {
+            result = new RowVersion(partitionIdFromLink(link), firstFragmentLink, timestamp, nextLink, null);
+            return STOP_TRAVERSAL;
+        }
+
+        return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+    }
+
+    private int partitionIdFromLink(long link) {
+        return PageIdUtils.partitionId(PageIdUtils.pageId(link));
+    }
+
+    @Override
+    public void finish() {
+        if (result != null) {
+            // we did not read the value itself, so we don't need to invoke readRowVersionValue.finish()
+            return;
+        }
+
+        readRowVersionValue.finish();
+
+        byte[] valueBytes = readRowVersionValue.result();
+        ByteBuffer value = ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+        result = new RowVersion(partitionIdFromLink(firstFragmentLink), firstFragmentLink, timestamp, nextLink, value);
+    }
+
+    RowVersion result() {
+        return result;
+    }
+
+    void reset() {
+        result = null;
+        readingFirstSlot = true;
+        readRowVersionValue.reset();
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
similarity index 63%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
index dbc6e3c2f..b1a3a3e4f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersionValue.java
@@ -15,22 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv;
 
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.pagememory.datapage.ReadPageMemoryRowValue;
 
 /**
- * Data region based on {@link PageMemory}.
+ * Reads {@link RowVersion#value()} from page-memory.
  */
-public interface PageMemoryDataRegion {
-    /**
-     * Returns {@link true} if the date region is persistent.
-     */
-    boolean persistent();
+class ReadRowVersionValue extends ReadPageMemoryRowValue {
+    @Override
+    protected int valueSizeOffsetInFirstSlot() {
+        return RowVersion.VALUE_SIZE_OFFSET;
+    }
 
-    /**
-     * Returns page memory, {@code null} if not started.
-     */
-    @Nullable
-    PageMemory pageMemory();
+    @Override
+    protected int valueOffsetInFirstSlot() {
+        return RowVersion.VALUE_OFFSET;
+    }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowIdIsInvalidForModificationsException.java
similarity index 67%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowIdIsInvalidForModificationsException.java
index dbc6e3c2f..c847fd740 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowIdIsInvalidForModificationsException.java
@@ -15,22 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv;
 
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
- * Data region based on {@link PageMemory}.
+ * Thrown when trying to do a modification at {@link RowId} that has already become invalid for writes.
  */
-public interface PageMemoryDataRegion {
-    /**
-     * Returns {@link true} if the date region is persistent.
-     */
-    boolean persistent();
-
-    /**
-     * Returns page memory, {@code null} if not started.
-     */
-    @Nullable
-    PageMemory pageMemory();
+class RowIdIsInvalidForModificationsException extends IgniteInternalException {
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
new file mode 100644
index 000000000..a75342b2d
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents row version inside row version chain.
+ */
+public class RowVersion implements Storable {
+    /**
+     * A 'timestamp' representing absense of a timestamp.
+     */
+    public static final Timestamp NULL_TIMESTAMP = new Timestamp(Long.MIN_VALUE, Long.MIN_VALUE);
+    /**
+     * Represents an absent partitionless link.
+     */
+    public static final long NULL_LINK = 0;
+
+    private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES;
+    private static final int NEXT_LINK_STORE_SIZE_BYTES = PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+    private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES;
+
+    public static final int TIMESTAMP_OFFSET = 0;
+    public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES;
+    public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + NEXT_LINK_STORE_SIZE_BYTES;
+    public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + VALUE_SIZE_STORE_SIZE_BYTES;
+
+    private final int partitionId;
+    private long link;
+
+    @Nullable
+    private final Timestamp timestamp;
+    private final long nextLink;
+    private final int valueSize;
+    @IgniteToStringExclude
+    @Nullable
+    private final ByteBuffer value;
+
+    /**
+     * Constructor.
+     */
+    public RowVersion(int partitionId, long nextLink, ByteBuffer value) {
+        this(partitionId, 0, null, nextLink, value);
+    }
+
+    /**
+     * Constructor.
+     */
+    public RowVersion(int partitionId, long link, @Nullable Timestamp timestamp, long nextLink, @Nullable ByteBuffer value) {
+        this.partitionId = partitionId;
+        link(link);
+
+        assert !NULL_TIMESTAMP.equals(timestamp) : "Null timestamp provided";
+
+        this.timestamp = timestamp;
+        this.nextLink = nextLink;
+        this.valueSize = value == null ? -1 : value.limit();
+        this.value = value;
+    }
+
+    @Nullable
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    public Timestamp timestampForStorage() {
+        return timestampForStorage(timestamp);
+    }
+
+    static Timestamp timestampForStorage(Timestamp timestamp) {
+        return timestamp == null ? NULL_TIMESTAMP : timestamp;
+    }
+
+    /**
+     * Returns partitionless link of the next version or {@code 0} if this version is the last in the chain (i.e. it's the oldest version).
+     *
+     * @return partitionless link of the next version or {@code 0} if this version is the last in the chain
+     */
+    public long nextLink() {
+        return nextLink;
+    }
+
+    public int valueSize() {
+        return valueSize;
+    }
+
+    public ByteBuffer value() {
+        return Objects.requireNonNull(value);
+    }
+
+    public boolean hasNextLink() {
+        return nextLink != NULL_LINK;
+    }
+
+    boolean isTombstone() {
+        return isTombstone(valueSize());
+    }
+
+    static boolean isTombstone(int valueSize) {
+        return valueSize == 0;
+    }
+
+    static boolean isTombstone(byte[] valueBytes) {
+        return isTombstone(valueBytes.length);
+    }
+
+    boolean isUncommitted() {
+        return isUncommitted(timestamp);
+    }
+
+    static boolean isUncommitted(Timestamp timestamp) {
+        return timestamp == null;
+    }
+
+    boolean isCommitted() {
+        return timestamp != null;
+    }
+
+    @Override
+    public final void link(long link) {
+        this.link = link;
+    }
+
+    @Override
+    public final long link() {
+        return link;
+    }
+
+    @Override
+    public final int partition() {
+        return partitionId;
+    }
+
+    @Override
+    public int size() {
+        assert value != null;
+
+        return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES + value.limit();
+    }
+
+    @Override
+    public int headerSize() {
+        return TIMESTAMP_STORE_SIZE_BYTES + NEXT_LINK_STORE_SIZE_BYTES + VALUE_SIZE_STORE_SIZE_BYTES;
+    }
+
+    @Override
+    public IoVersions<? extends AbstractDataPageIo> ioVersions() {
+        return RowVersionDataIo.VERSIONS;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(RowVersion.class, this);
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
new file mode 100644
index 000000000..622415878
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
+import org.apache.ignite.internal.pagememory.freelist.AbstractFreeList;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link AbstractFreeList} for {@link RowVersion} instances.
+ */
+public class RowVersionFreeList extends AbstractFreeList<RowVersion> {
+    private static final IgniteLogger LOG = IgniteLogger.forClass(RowVersionFreeList.class);
+
+    private final PageEvictionTracker evictionTracker;
+    private final IoStatisticsHolder statHolder;
+
+    private final UpdateTimestampHandler updateTimestampHandler = new UpdateTimestampHandler();
+
+    /**
+     * Constructor.
+     *
+     * @param grpId              Group ID.
+     * @param pageMem            Page memory.
+     * @param reuseList          Reuse list to track pages that can be reused after they get completely empty (if {@code null},
+     *                           the free list itself will be used as a ReuseList.
+     * @param lockLsnr           Page lock listener.
+     * @param metaPageId         Metadata page ID.
+     * @param initNew            {@code True} if new metadata should be initialized.
+     * @param pageListCacheLimit Page list cache limit.
+     * @param evictionTracker    Page eviction tracker.
+     * @param statHolder         Statistics holder to track IO operations.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public RowVersionFreeList(
+            int grpId,
+            PageMemory pageMem,
+            @Nullable ReuseList reuseList,
+            PageLockListener lockLsnr,
+            long metaPageId,
+            boolean initNew,
+            @Nullable AtomicLong pageListCacheLimit,
+            PageEvictionTracker evictionTracker,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        super(
+                grpId,
+                "RowVersionFreeList_" + grpId,
+                pageMem,
+                reuseList,
+                lockLsnr,
+                FLAG_AUX,
+                LOG,
+                metaPageId,
+                initNew,
+                pageListCacheLimit,
+                evictionTracker
+        );
+
+        this.evictionTracker = evictionTracker;
+        this.statHolder = statHolder;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+        return pageMem.allocatePage(grpId, INDEX_PARTITION, defaultPageFlag);
+    }
+
+    /**
+     * Inserts a row.
+     *
+     * @param row Row.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void insertDataRow(RowVersion row) throws IgniteInternalCheckedException {
+        super.insertDataRow(row, statHolder);
+    }
+
+    /**
+     * Updates row version's timestamp.
+     *
+     * @param link         link to the slot containing row version
+     * @param newTimestamp timestamp to set
+     * @throws IgniteInternalCheckedException if something fails
+     */
+    public void updateTimestamp(long link, Timestamp newTimestamp) throws IgniteInternalCheckedException {
+        updateDataRow(link, updateTimestampHandler, newTimestamp, statHolder);
+    }
+
+    /**
+     * Removes a row by link.
+     *
+     * @param link Row link.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void removeDataRowByLink(long link) throws IgniteInternalCheckedException {
+        super.removeDataRowByLink(link, statHolder);
+    }
+
+    private class UpdateTimestampHandler implements PageHandler<Timestamp, Object> {
+        @Override
+        public Object run(
+                int groupId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo io,
+                Timestamp arg,
+                int itemId,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            RowVersionDataIo dataIo = (RowVersionDataIo) io;
+
+            dataIo.updateTimestamp(pageAddr, itemId, pageSize(), arg);
+
+            evictionTracker.touchPage(pageId);
+
+            return true;
+        }
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
new file mode 100644
index 000000000..19f3086e1
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Traversal that scans Version Chain until first version visible at the given timestamp is found; then the version
+ * is converted to {@link ByteBufferRow} and finally made available via {@link #result()}.
+ *
+ * <p>NB: this traversal first traverses starting data slots of the Version Chain one after another; when it finds the
+ * version it needs, it switches to traversing the slots comprising the version (because it might be fragmented).
+ */
+class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> {
+    /**
+     * Contains the result when the traversal ends.
+     */
+    @Nullable
+    private ByteBufferRow result;
+
+    /**
+     * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink);
+     * then it's {@code false} (when we found the version we need and we read its value).
+     */
+    private boolean lookingForVersion = true;
+
+    private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+
+    @Override
+    public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Timestamp timestamp) {
+        if (lookingForVersion) {
+            Timestamp rowVersionTs = Timestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+
+            if (rowTimestampMatches(rowVersionTs, timestamp)) {
+                return readFullyOrStartReadingFragmented(link, pageAddr, payload);
+            } else {
+                return advanceToNextVersion(pageAddr, payload, partitionIdFromLink(link));
+            }
+        } else {
+            // We are continuing reading a fragmented row.
+            return readNextFragment(link, pageAddr, payload);
+        }
+    }
+
+    private int partitionIdFromLink(long link) {
+        return PageIdUtils.partitionId(PageIdUtils.pageId(link));
+    }
+
+    private boolean rowTimestampMatches(Timestamp rowVersionTs, Timestamp timestamp) {
+        return rowVersionTs != null && rowVersionTs.beforeOrEquals(timestamp);
+    }
+
+    private long readFullyOrStartReadingFragmented(long link, long pageAddr, DataPagePayload payload) {
+        lookingForVersion = false;
+
+        return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+    }
+
+    private long advanceToNextVersion(long pageAddr, DataPagePayload payload, int partitionId) {
+        long partitionlessNextLink = PartitionlessLinks.readFromMemory(pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+        if (partitionlessNextLink == RowVersion.NULL_LINK) {
+            return STOP_TRAVERSAL;
+        }
+        return PartitionlessLinks.addPartitionIdToPartititionlessLink(partitionlessNextLink, partitionId);
+    }
+
+    private long readNextFragment(long link, long pageAddr, DataPagePayload payload) {
+        return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
+    }
+
+    @Override
+    public void finish() {
+        if (lookingForVersion) {
+            // we never found the version -> we hever tried to read its value AND the result is null
+            result = null;
+            return;
+        }
+
+        readRowVersionValue.finish();
+
+        byte[] valueBytes = readRowVersionValue.result();
+
+        if (RowVersion.isTombstone(valueBytes)) {
+            result = null;
+        } else {
+            result = new ByteBufferRow(valueBytes);
+        }
+    }
+
+    @Nullable
+    ByteBufferRow result() {
+        return result;
+    }
+
+    void reset() {
+        result = null;
+        lookingForVersion = true;
+        readRowVersionValue.reset();
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
new file mode 100644
index 000000000..f78bb42eb
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.tx.Timestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Code to work with {@link Timestamp}s.
+ */
+public class Timestamps {
+    /**
+     * Reads a {@link Timestamp} value from memory.
+     *
+     * @param pageAddr address where page data starts
+     * @param offset   offset to the timestamp value relative to pageAddr
+     * @return the timestamp
+     */
+    @Nullable
+    static Timestamp readTimestamp(long pageAddr, int offset) {
+        long nodeId = getLong(pageAddr, offset);
+        long localTimestamp = getLong(pageAddr, offset + Long.BYTES);
+
+        Timestamp timestamp = new Timestamp(localTimestamp, nodeId);
+        if (timestamp.equals(RowVersion.NULL_TIMESTAMP)) {
+            timestamp = null;
+        }
+
+        return timestamp;
+    }
+
+    /**
+     * Writes a {@link Timestamp} to memory starting at the given address + offset.
+     *
+     * @param addr      memory address
+     * @param offset    offset added to the address
+     * @param timestamp the timestamp to write
+     * @return number of bytes written
+     */
+    public static int writeTimestamp(long addr, int offset, @Nullable Timestamp timestamp) {
+        Timestamp timestampForStorage = RowVersion.timestampForStorage(timestamp);
+
+        putLong(addr, offset, timestampForStorage.getNodeId());
+        putLong(addr, offset + Long.BYTES, timestampForStorage.getTimestamp());
+
+        return 2 * Long.BYTES;
+    }
+
+    private Timestamps() {
+        // prevent instantiation
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java
new file mode 100644
index 000000000..709d7b18e
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utils to work with Transaction IDs.
+ */
+public class TransactionIds {
+    /**
+     * Writes transaction ID to memory starting at the specified address.
+     *
+     * @param addr   addreds where to start writing
+     * @param offset offset to add to the address
+     * @param txId   transaction ID to write
+     * @return number of bytes written
+     */
+    public static int writeTransactionId(long addr, int offset, @Nullable UUID txId) {
+        long txIdHigh;
+        long txIdLow;
+        if (txId != null) {
+            txIdHigh = txId.getMostSignificantBits();
+            txIdLow = txId.getLeastSignificantBits();
+        } else {
+            txIdHigh = VersionChain.NULL_UUID_COMPONENT;
+            txIdLow = VersionChain.NULL_UUID_COMPONENT;
+        }
+
+        putLong(addr, offset, txIdHigh);
+        putLong(addr, offset + Long.BYTES, txIdLow);
+
+        return 2 * Long.BYTES;
+    }
+
+    private TransactionIds() {
+        // prevent instantiation
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
new file mode 100644
index 000000000..15c702a9d
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainDataIo;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents row version chain: that is, all versions of the row plus some row-level metadata.
+ *
+ * <p>NB: this represents the whole set of versions, not just one version in the chain.
+ */
+public class VersionChain extends VersionChainLink implements Storable {
+    public static long NULL_UUID_COMPONENT = 0;
+
+    private static final int TRANSACTION_ID_STORE_SIZE_BYTES = 2 * Long.BYTES;
+    private static final int HEAD_LINK_STORE_SIZE_BYTES = PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+
+    public static final int TRANSACTION_ID_OFFSET = 0;
+
+    private final int partitionId;
+    @Nullable
+    private final UUID transactionId;
+    private final long headLink;
+
+    // TODO: IGNITE-17008 - add nextLink
+
+    /**
+     * Constructs a VersionChain without a transaction ID.
+     */
+    public static VersionChain withoutTxId(int partitionId, long link, long headLink) {
+        return new VersionChain(partitionId, link, null, headLink);
+    }
+
+    /**
+     * Constructor.
+     */
+    public VersionChain(int partitionId, @Nullable UUID transactionId, long headLink) {
+        this.partitionId = partitionId;
+        this.transactionId = transactionId;
+        this.headLink = headLink;
+    }
+
+    /**
+     * Constructor.
+     */
+    public VersionChain(int partitionId, long link, @Nullable UUID transactionId, long headLink) {
+        super(link);
+        this.partitionId = partitionId;
+        this.transactionId = transactionId;
+        this.headLink = headLink;
+    }
+
+    @Nullable
+    public UUID transactionId() {
+        return transactionId;
+    }
+
+    public long headLink() {
+        return headLink;
+    }
+
+    @Override
+    public final int partition() {
+        return partitionId;
+    }
+
+    @Override
+    public int size() {
+        return TRANSACTION_ID_STORE_SIZE_BYTES + HEAD_LINK_STORE_SIZE_BYTES;
+    }
+
+    @Override
+    public int headerSize() {
+        return size();
+    }
+
+    @Override
+    public IoVersions<? extends AbstractDataPageIo> ioVersions() {
+        return VersionChainDataIo.VERSIONS;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(VersionChain.class, this);
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainDataPageReader.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainDataPageReader.java
new file mode 100644
index 000000000..2b8a8221f
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainDataPageReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
+import org.apache.ignite.internal.pagememory.datapage.NonFragmentableDataPageReader;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link DataPageReader} for {@link VersionChain} instances.
+ */
+class VersionChainDataPageReader extends NonFragmentableDataPageReader<VersionChain> {
+    /**
+     * Constructs a new instance.
+     *
+     * @param pageMemory       page memory that will be used to lock and access memory
+     * @param groupId          ID of the cache group with which the reader works (all pages must belong to this group)
+     * @param statisticsHolder used to track statistics about operations
+     */
+    public VersionChainDataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder statisticsHolder) {
+        super(pageMemory, groupId, statisticsHolder);
+    }
+
+    @Nullable
+    private UUID assembleTxId(long high, long low) {
+        if (high == VersionChain.NULL_UUID_COMPONENT && low == VersionChain.NULL_UUID_COMPONENT) {
+            return null;
+        } else {
+            return new UUID(high, low);
+        }
+    }
+
+    @Override
+    protected VersionChain readRowFromAddress(long link, long pageAddr) {
+        int offset = 0;
+
+        long high = getLong(pageAddr, offset);
+        offset += Long.BYTES;
+        long low = getLong(pageAddr, offset);
+        offset += Long.BYTES;
+        UUID txId = assembleTxId(high, low);
+
+        long headLink = PartitionlessLinks.readFromMemory(pageAddr, offset);
+        offset += PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+
+        return new VersionChain(partitionOfLink(link), link, txId, headLink);
+    }
+
+    private int partitionOfLink(long link) {
+        return partitionId(pageId(link));
+    }
+
+    @Override
+    protected boolean handleNonExistentItemsGracefully() {
+        return true;
+    }
+
+    @Override
+    protected @Nullable VersionChain rowForNonExistingItem(long pageId, long itemId) {
+        return null;
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainFreeList.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainFreeList.java
new file mode 100644
index 000000000..e954918d3
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainFreeList.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
+import org.apache.ignite.internal.pagememory.freelist.AbstractFreeList;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.util.PageHandler;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainDataIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link AbstractFreeList} for {@link VersionChain} instances.
+ */
+public class VersionChainFreeList extends AbstractFreeList<VersionChain> {
+    private static final IgniteLogger LOG = IgniteLogger.forClass(VersionChainFreeList.class);
+
+    private final PageEvictionTracker evictionTracker;
+    private final IoStatisticsHolder statHolder;
+
+    private final UpdateTransactionIdHandler updateTransactionHandler = new UpdateTransactionIdHandler();
+
+    /**
+     * Constructor.
+     *
+     * @param grpId              Group ID.
+     * @param pageMem            Page memory.
+     * @param reuseList          Reuse list to use.
+     * @param lockLsnr           Page lock listener.
+     * @param metaPageId         Metadata page ID.
+     * @param initNew            {@code True} if new metadata should be initialized.
+     * @param pageListCacheLimit Page list cache limit.
+     * @param evictionTracker    Page eviction tracker.
+     * @param statHolder         Statistics holder to track IO operations.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public VersionChainFreeList(
+            int grpId,
+            PageMemory pageMem,
+            ReuseList reuseList,
+            PageLockListener lockLsnr,
+            long metaPageId,
+            boolean initNew,
+            @Nullable AtomicLong pageListCacheLimit,
+            PageEvictionTracker evictionTracker,
+            IoStatisticsHolder statHolder
+    ) throws IgniteInternalCheckedException {
+        super(
+                grpId,
+                "VersionChainFreeList_" + grpId,
+                pageMem,
+                reuseList,
+                lockLsnr,
+                FLAG_AUX,
+                LOG,
+                metaPageId,
+                initNew,
+                pageListCacheLimit,
+                evictionTracker
+        );
+
+        this.evictionTracker = evictionTracker;
+        this.statHolder = statHolder;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+        return pageMem.allocatePage(grpId, INDEX_PARTITION, defaultPageFlag);
+    }
+
+    /**
+     * Inserts a row.
+     *
+     * @param row Row.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void insertDataRow(VersionChain row) throws IgniteInternalCheckedException {
+        super.insertDataRow(row, statHolder);
+    }
+
+    /**
+     * Updates a row by link.
+     *
+     * @param link Row link.
+     * @param row New row data.
+     * @return {@code True} if was able to update row.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public boolean updateDataRow(long link, VersionChain row) throws IgniteInternalCheckedException {
+        return super.updateDataRow(link, row, statHolder);
+    }
+
+    /**
+     * Updates version chain's transactionId.
+     *
+     * @param link         link to the slot containing row version
+     * @param transactionId transaction ID to set
+     * @throws IgniteInternalCheckedException if something fails
+     */
+    public void updateTransactionId(long link, @Nullable UUID transactionId) throws IgniteInternalCheckedException {
+        updateDataRow(link, updateTransactionHandler, transactionId, statHolder);
+    }
+
+    /**
+     * Removes a row by link.
+     *
+     * @param link Row link.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void removeDataRowByLink(long link) throws IgniteInternalCheckedException {
+        super.removeDataRowByLink(link, statHolder);
+    }
+
+    private class UpdateTransactionIdHandler implements PageHandler<UUID, Object> {
+        @Override
+        public Object run(
+                int groupId,
+                long pageId,
+                long page,
+                long pageAddr,
+                PageIo io,
+                UUID arg,
+                int itemId,
+                IoStatisticsHolder statHolder
+        ) throws IgniteInternalCheckedException {
+            VersionChainDataIo dataIo = (VersionChainDataIo) io;
+
+            dataIo.updateTransactionId(pageAddr, itemId, pageSize(), arg);
+
+            evictionTracker.touchPage(pageId);
+
+            return true;
+        }
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainLink.java
similarity index 65%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainLink.java
index dbc6e3c2f..4d0d9514f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainLink.java
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory;
-
-import org.jetbrains.annotations.Nullable;
+package org.apache.ignite.internal.storage.pagememory.mv;
 
 /**
- * Data region based on {@link PageMemory}.
+ * A link to version chain.
  */
-public interface PageMemoryDataRegion {
-    /**
-     * Returns {@link true} if the date region is persistent.
-     */
-    boolean persistent();
+public class VersionChainLink {
+    public static final int SIZE_IN_BYTES = Long.BYTES;
+
+    private long link;
+
+    public VersionChainLink() {
+    }
+
+    public VersionChainLink(long link) {
+        this.link = link;
+    }
+
+    public long link() {
+        return link;
+    }
 
-    /**
-     * Returns page memory, {@code null} if not started.
-     */
-    @Nullable
-    PageMemory pageMemory();
+    public void link(long link) {
+        this.link = link;
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainTree.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainTree.java
new file mode 100644
index 000000000..d509ee406
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainTree.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainInnerIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainMetaIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link BplusTree} implementation for storing version chains.
+ */
+public class VersionChainTree extends BplusTree<VersionChainLink, VersionChain> {
+    private final int partitionId;
+
+    private final VersionChainDataPageReader dataPageReader;
+
+    /**
+     * Constructor.
+     *
+     * @param grpId Group ID.
+     * @param grpName Group name.
+     * @param pageMem Page memory.
+     * @param lockLsnr Page lock listener.
+     * @param globalRmvId Global remove ID.
+     * @param metaPageId Meta page ID.
+     * @param reuseList Reuse list.
+     * @param partitionId Partition id.
+     * @param initNew {@code True} if new tree should be created.
+     */
+    public VersionChainTree(
+            int grpId,
+            String grpName,
+            PageMemory pageMem,
+            PageLockListener lockLsnr,
+            AtomicLong globalRmvId,
+            long metaPageId,
+            @Nullable ReuseList reuseList,
+            int partitionId,
+            boolean initNew
+    ) throws IgniteInternalCheckedException {
+        super(
+                "VersionChainTree_" + grpId,
+                grpId,
+                grpName,
+                pageMem,
+                lockLsnr,
+                FLAG_AUX,
+                globalRmvId,
+                metaPageId,
+                reuseList
+        );
+
+        this.partitionId = partitionId;
+
+        dataPageReader = new VersionChainDataPageReader(pageMem, grpId, IoStatisticsHolderNoOp.INSTANCE);
+
+        setIos(VersionChainInnerIo.VERSIONS, VersionChainLeafIo.VERSIONS, VersionChainMetaIo.VERSIONS);
+
+        initTree(initNew);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+        return pageMem.allocatePage(grpId, partitionId, defaultPageFlag);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected int compare(BplusIo<VersionChainLink> io, long pageAddr, int idx, VersionChainLink row) {
+        VersionChainIo versionChainIo = (VersionChainIo) io;
+
+        long thisLink = versionChainIo.link(pageAddr, idx);
+        long thatLink = row.link();
+        return Long.compare(thisLink, thatLink);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public VersionChain getRow(BplusIo<VersionChainLink> io, long pageAddr, int idx, Object x) throws IgniteInternalCheckedException {
+        VersionChainIo rowIo = (VersionChainIo) io;
+
+        long link = rowIo.link(pageAddr, idx);
+
+        return dataPageReader.getRowByLink(link);
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
new file mode 100644
index 000000000..747a528f6
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putByteBuffer;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks;
+import org.apache.ignite.internal.storage.pagememory.mv.RowVersion;
+import org.apache.ignite.internal.storage.pagememory.mv.Timestamps;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.lang.IgniteStringBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data pages IO for {@link RowVersion}.
+ */
+public class RowVersionDataIo extends AbstractDataPageIo<RowVersion> {
+    /** Page IO type. */
+    public static final short T_VALUE_VERSION_DATA_IO = 11;
+
+    /** I/O versions. */
+    public static final IoVersions<RowVersionDataIo> VERSIONS = new IoVersions<>(new RowVersionDataIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver Page format version.
+     */
+    protected RowVersionDataIo(int ver) {
+        super(T_VALUE_VERSION_DATA_IO, ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void writeRowData(long pageAddr, int dataOff, int payloadSize, RowVersion row, boolean newRow) {
+        assertPageType(pageAddr);
+
+        long addr = pageAddr + dataOff;
+
+        putShort(addr, 0, (short) payloadSize);
+        addr += 2;
+
+        addr += Timestamps.writeTimestamp(addr, 0, row.timestamp());
+
+        addr += PartitionlessLinks.writeToMemory(addr, row.nextLink());
+
+        ByteBuffer value = row.value();
+        putInt(addr, 0, value.limit());
+        addr += 4;
+
+        putByteBuffer(addr, 0, value);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void writeFragmentData(RowVersion row, ByteBuffer buf, int rowOff, int payloadSize) {
+        assertPageType(buf);
+
+        // TODO: IGNITE-17009 - implement
+
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    /**
+     * Updates timestamp leaving the rest untouched.
+     *
+     * @param pageAddr  page address
+     * @param itemId    item ID of the slot where row version (or its first fragment) is stored in this page
+     * @param pageSize  size of the page
+     * @param timestamp timestamp to store
+     */
+    public void updateTimestamp(long pageAddr, int itemId, int pageSize, @Nullable Timestamp timestamp) {
+        int dataOff = getDataOffset(pageAddr, itemId, pageSize);
+        int payloadOffset = dataOff + Short.BYTES;
+
+        Timestamps.writeTimestamp(pageAddr, payloadOffset + RowVersion.TIMESTAMP_OFFSET, timestamp);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
+        sb.app("RowVersionDataIo [\n");
+        printPageLayout(addr, pageSize, sb);
+        sb.app("\n]");
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainDataIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainDataIo.java
new file mode 100644
index 000000000..eb18573a5
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainDataIo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.mv.PartitionlessLinks;
+import org.apache.ignite.internal.storage.pagememory.mv.TransactionIds;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChain;
+import org.apache.ignite.lang.IgniteStringBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link AbstractDataPageIo} for {@link VersionChain} instances.
+ */
+public class VersionChainDataIo extends AbstractDataPageIo<VersionChain> {
+    /** Page IO type. */
+    public static final short T_VERSION_CHAIN_IO = 7;
+
+    /** I/O versions. */
+    public static final IoVersions<VersionChainDataIo> VERSIONS = new IoVersions<>(new VersionChainDataIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver  Page format version.
+     */
+    protected VersionChainDataIo(int ver) {
+        super(T_VERSION_CHAIN_IO, ver);
+    }
+
+    @Override
+    protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
+        sb.app("VersionChainDataIo [\n");
+        printPageLayout(addr, pageSize, sb);
+        sb.app("\n]");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void writeRowData(long pageAddr, int dataOff, int payloadSize, VersionChain row, boolean newRow) {
+        assertPageType(pageAddr);
+
+        long addr = pageAddr + dataOff;
+
+        putShort(addr, 0, (short) payloadSize);
+        addr += Short.BYTES;
+
+        addr += TransactionIds.writeTransactionId(addr, 0, row.transactionId());
+
+        addr += PartitionlessLinks.writeToMemory(addr, row.headLink());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void writeFragmentData(VersionChain row, ByteBuffer buf, int rowOff, int payloadSize) {
+        assertPageType(buf);
+
+        throw new UnsupportedOperationException("Splitting version chain rows to fragments is ridiculous!");
+    }
+
+    /**
+     * Writes transaction ID leaving everything else untouched.
+     *
+     * @param pageAddr page address
+     * @param itemId   number of the item representing the slot where the row of interest resides
+     * @param pageSize size of the page
+     * @param txId     transaction ID to write
+     */
+    public void updateTransactionId(long pageAddr, int itemId, int pageSize, @Nullable UUID txId) {
+        int dataOff = getDataOffset(pageAddr, itemId, pageSize);
+        int payloadOffset = dataOff + Short.BYTES;
+
+        TransactionIds.writeTransactionId(pageAddr, payloadOffset + VersionChain.TRANSACTION_ID_OFFSET, txId);
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
new file mode 100644
index 000000000..d24e83ea4
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainLink;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
+
+/**
+ * IO routines for {@link VersionChainTree} inner pages.
+ *
+ * <p>Structure: link(long).
+ */
+public class VersionChainInnerIo extends BplusInnerIo<VersionChainLink> implements VersionChainIo {
+    /** Page IO type. */
+    public static final short T_VERSION_CHAIN_INNER_IO = 9;
+
+    /** I/O versions. */
+    public static final IoVersions<VersionChainInnerIo> VERSIONS = new IoVersions<>(new VersionChainInnerIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver Page format version.
+     */
+    protected VersionChainInnerIo(int ver) {
+        super(T_VERSION_CHAIN_INNER_IO, ver, true, VersionChainLink.SIZE_IN_BYTES);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void store(long dstPageAddr, int dstIdx, BplusIo<VersionChainLink> srcIo, long srcPageAddr, int srcIdx) {
+        assertPageType(dstPageAddr);
+
+        long srcLink = link(srcPageAddr, srcIdx);
+
+        int dstOff = offset(dstIdx);
+
+        putLong(dstPageAddr, dstOff, srcLink);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void storeByOffset(long pageAddr, int off, VersionChainLink row) {
+        assertPageType(pageAddr);
+
+        putLong(pageAddr, off, row.link());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public VersionChainLink getLookupRow(BplusTree<VersionChainLink, ?> tree, long pageAddr, int idx) {
+        long link = link(pageAddr, idx);
+
+        return new VersionChainLink(link);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long link(long pageAddr, int idx) {
+        assert idx < getCount(pageAddr) : idx;
+
+        return getLong(pageAddr, offset(idx));
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
similarity index 68%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
index dbc6e3c2f..63b42992e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
@@ -15,22 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory;
-
-import org.jetbrains.annotations.Nullable;
+package org.apache.ignite.internal.storage.pagememory.mv.io;
 
 /**
- * Data region based on {@link PageMemory}.
+ * Interface for VersionChain B+Tree-related IO.
  */
-public interface PageMemoryDataRegion {
-    /**
-     * Returns {@link true} if the date region is persistent.
-     */
-    boolean persistent();
-
+public interface VersionChainIo {
     /**
-     * Returns page memory, {@code null} if not started.
+     * Returns the link for the row in the page by index.
+     *
+     * @param pageAddr Page address.
+     * @param idx Index.
      */
-    @Nullable
-    PageMemory pageMemory();
+    long link(long pageAddr, int idx);
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
new file mode 100644
index 000000000..15bbce063
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainLink;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
+
+/**
+ * IO routines for {@link VersionChainTree} leaf pages.
+ *
+ * <p>Structure: link(long).
+ */
+public class VersionChainLeafIo extends BplusLeafIo<VersionChainLink> implements VersionChainIo {
+    /** Page IO type. */
+    public static final short T_VERSION_CHAIN_LEAF_IO = 10;
+
+    /** I/O versions. */
+    public static final IoVersions<VersionChainLeafIo> VERSIONS = new IoVersions<>(new VersionChainLeafIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver Page format version.
+     */
+    protected VersionChainLeafIo(int ver) {
+        super(T_VERSION_CHAIN_LEAF_IO, ver, VersionChainLink.SIZE_IN_BYTES);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void store(long dstPageAddr, int dstIdx, BplusIo<VersionChainLink> srcIo, long srcPageAddr, int srcIdx) {
+        assertPageType(dstPageAddr);
+
+        long srcLink = link(srcPageAddr, srcIdx);
+
+        int dstOff = offset(dstIdx);
+
+        putLong(dstPageAddr, dstOff, srcLink);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void storeByOffset(long pageAddr, int off, VersionChainLink row) {
+        assertPageType(pageAddr);
+
+        putLong(pageAddr, off, row.link());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public VersionChainLink getLookupRow(BplusTree<VersionChainLink, ?> tree, long pageAddr, int idx) {
+        long link = link(pageAddr, idx);
+
+        return new VersionChainLink(link);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long link(long pageAddr, int idx) {
+        assert idx < getCount(pageAddr) : idx;
+
+        return getLong(pageAddr, offset(idx));
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
similarity index 52%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
index dbc6e3c2f..ae66188a4 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory;
+package org.apache.ignite.internal.storage.pagememory.mv.io;
 
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.io.BplusMetaIo;
+import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
 
 /**
- * Data region based on {@link PageMemory}.
+ * IO routines for {@link VersionChainTree} meta pages.
  */
-public interface PageMemoryDataRegion {
-    /**
-     * Returns {@link true} if the date region is persistent.
-     */
-    boolean persistent();
+public class VersionChainMetaIo extends BplusMetaIo {
+    /** Page IO type. */
+    public static final short T_VERSION_CHAIN_META_IO = 8;
+
+    /** I/O versions. */
+    public static final IoVersions<VersionChainMetaIo> VERSIONS = new IoVersions<>(new VersionChainMetaIo(1));
 
     /**
-     * Returns page memory, {@code null} if not started.
+     * Constructor.
+     *
+     * @param ver Page format version.
      */
-    @Nullable
-    PageMemory pageMemory();
+    protected VersionChainMetaIo(int ver) {
+        super(T_VERSION_CHAIN_META_IO, ver);
+    }
 }
diff --git a/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
index 262224961..639bb65fe 100644
--- a/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
+++ b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
@@ -15,3 +15,4 @@
 # limitations under the License.
 #
 org.apache.ignite.internal.storage.pagememory.PageMemoryStorageIoModule
+org.apache.ignite.internal.storage.pagememory.mv.PageMemoryMvStorageIoModule
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorageTest.java
new file mode 100644
index 000000000..49e53a80d
--- /dev/null
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorageTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.pagememory.PageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.PageMemoryTableStorage;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageChange;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageConfigurationSchema;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryDataStorageView;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PageMemoryStorageEngineConfigurationSchema;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+class PageMemoryMvPartitionStorageTest extends AbstractMvPartitionStorageTest<PageMemoryMvPartitionStorage> {
+    private final PageIoRegistry ioRegistry = new PageIoRegistry();
+
+    {
+        ioRegistry.loadFromServiceLoader();
+    }
+
+    @InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
+    private PageMemoryStorageEngineConfiguration engineConfig;
+
+    @InjectConfiguration(
+            name = "table",
+            polymorphicExtensions = {
+                    HashIndexConfigurationSchema.class,
+                    UnknownDataStorageConfigurationSchema.class,
+                    PageMemoryDataStorageConfigurationSchema.class
+            }
+    )
+    private TableConfiguration tableCfg;
+
+    private PageMemoryStorageEngine engine;
+
+    private PageMemoryTableStorage table;
+
+    private int nextPageIndex = 100;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        engine = new PageMemoryStorageEngine(engineConfig, ioRegistry);
+
+        engine.start();
+
+        tableCfg.change(c -> c.changeDataStorage(dsc -> dsc.convert(PageMemoryDataStorageChange.class)))
+                .get(1, TimeUnit.SECONDS);
+
+        assertEquals(
+                PageMemoryStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME,
+                ((PageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
+        );
+
+        table = engine.createTable(tableCfg);
+        table.start();
+
+        storage = table.createMvPartitionStorage(partitionId());
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(
+                storage,
+                table == null ? null : table::stop,
+                engine == null ? null : engine::stop
+        );
+    }
+
+    @Override
+    protected int partitionId() {
+        // 1 instead of the default 0 to make sure that we note cases when we forget to pass the partition ID (in which
+        // case it will turn into 0).
+        return 1;
+    }
+
+    @SuppressWarnings("JUnit3StyleTestMethodInJUnit4Class")
+    @Override
+    public void testReadsFromEmpty() {
+        // TODO: enable the test when https://issues.apache.org/jira/browse/IGNITE-17006 is implemented
+
+        // Effectively, disable the test because it makes no sense for this kind of storage as attempts to read using random
+        // pageIds will result in troubles.
+    }
+
+    @Test
+    void abortOfInsertMakesRowIdInvalidForAddWrite() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.abortWrite(rowId);
+
+        assertThrows(RowIdIsInvalidForModificationsException.class, () -> storage.addWrite(rowId, binaryRow2, txId));
+    }
+
+    @Test
+    void abortOfInsertMakesRowIdInvalidForCommitWrite() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.abortWrite(rowId);
+
+        assertThrows(RowIdIsInvalidForModificationsException.class, () -> storage.commitWrite(rowId, Timestamp.nextVersion()));
+    }
+
+    @Test
+    void abortOfInsertMakesRowIdInvalidForAbortWrite() {
+        RowId rowId = storage.insert(binaryRow, newTransactionId());
+        storage.abortWrite(rowId);
+
+        assertThrows(RowIdIsInvalidForModificationsException.class, () -> storage.abortWrite(rowId));
+    }
+}
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinksTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinksTest.java
new file mode 100644
index 000000000..3ee377c93
--- /dev/null
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PartitionlessLinksTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.junit.jupiter.api.Test;
+
+class PartitionlessLinksTest {
+    @Test
+    void removesPartitionIdFromFullLink() {
+        // 0xABCD is partitionId
+        long fullLink = 0x7733ABCDDEADBEEFL;
+
+        long partitionlessLink = PartitionlessLinks.removePartitionIdFromLink(fullLink);
+
+        assertThat(partitionlessLink, is(0x7733DEADBEEFL));
+    }
+
+    @Test
+    void addsPartitionId() {
+        long partitionlessLink = 0x7733DEADBEEFL;
+
+        long fullLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(partitionlessLink, 0xABCD);
+
+        assertThat(fullLink, is(0x7733ABCDDEADBEEFL));
+    }
+
+    @Test
+    void removalAndAdditionOfPartitionIdRestoresLink() {
+        long fullLink = 0x7733ABCDDEADBEEFL;
+
+        long partitionlessLink = PartitionlessLinks.removePartitionIdFromLink(fullLink);
+        long reconstructedLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(partitionlessLink, 0xABCD);
+
+        assertThat(reconstructedLink, is(fullLink));
+    }
+
+    @Test
+    void extractsHigh16Bits() {
+        short high16Bits = PartitionlessLinks.high16Bits(0x7733DEADBEEFL);
+
+        assertThat(high16Bits, is((short) 0x7733));
+    }
+
+    @Test
+    void extractsLow32Bits() {
+        int low32Bits = PartitionlessLinks.low32Bits(0x7733DEADBEEFL);
+
+        assertThat(low32Bits, is(0xDEADBEEF));
+    }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 21b57fe49..0b4949d3c 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -30,6 +30,7 @@ import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.NoUncommittedVersionException;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -212,6 +213,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         try {
             byte[] previousValue = db.get(cf, keyBuf.array(), 0, ROW_PREFIX_SIZE);
+            if (previousValue == null) {
+                throw new NoUncommittedVersionException();
+            }
 
             // Perform unconditional remove for the key without associated timestamp.
             db.delete(cf, writeOpts, keyBuf.array(), 0, ROW_PREFIX_SIZE);
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index d7323c039..1b1573e3a 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageChange;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -46,13 +45,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ConfigurationExtension.class)
-public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTest {
+public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTest<RocksDbMvPartitionStorage> {
     private RocksDbStorageEngine engine;
 
     private RocksDbTableStorage table;
 
-    private RocksDbMvPartitionStorage storage;
-
     @BeforeEach
     public void setUp(
             @WorkDirectory Path workDir,
@@ -92,8 +89,4 @@ public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTes
         );
     }
 
-    @Override
-    protected MvPartitionStorage partitionStorage() {
-        return storage;
-    }
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java
index 32adc54a0..e3c8640ae 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java
@@ -76,7 +76,17 @@ public class Timestamp implements Comparable<Timestamp>, Serializable {
     @Override
     public int compareTo(@NotNull Timestamp other) {
         return (this.timestamp < other.timestamp ? -1 : (this.timestamp > other.timestamp ? 1 :
-                (this.nodeId < other.nodeId ? -1 : (this.nodeId > other.nodeId ? 1 : 0))));
+                Long.compare(this.nodeId, other.nodeId)));
+    }
+
+    /**
+     * Returns {@code true} if this is before or equal to another timestamp.
+     *
+     * @param that another timestamp
+     * @return {@code true} if this is before or equal to another timestamp
+     */
+    public boolean beforeOrEquals(Timestamp that) {
+        return this.compareTo(that) <= 0;
     }
 
     /**