You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/09/19 15:06:52 UTC

[ignite-3] 01/01: IGNITE-17627 Extend MvPartitionStorage read API with write intent resolution capabilities

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17627
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 45e228da7c2db45e2a3aa3b32fac90eaa1faa796
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Sep 19 19:00:42 2022 +0400

    IGNITE-17627 Extend MvPartitionStorage read API with write intent resolution capabilities
---
 .../apache/ignite/internal/util/GridUnsafe.java    |  52 ++--
 .../internal/util/io/IgniteUnsafeDataInput.java    |  22 +-
 .../internal/util/io/IgniteUnsafeDataOutput.java   |  22 +-
 .../stream/DirectByteBufferStreamImplV1.java       |  44 +--
 .../internal/storage/MvPartitionStorage.java       |  30 +-
 .../apache/ignite/internal/storage/ReadResult.java | 125 ++++++++
 .../storage/AbstractMvPartitionStorageTest.java    | 296 ++++++++++++++++--
 .../storage/AbstractMvTableStorageTest.java        |   4 +-
 .../TestConcurrentHashMapMvPartitionStorage.java   | 120 ++++++--
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 113 +++++--
 .../internal/storage/pagememory/mv/RowVersion.java |   2 +-
 .../pagememory/mv/ScanVersionChainByTimestamp.java |  80 ++++-
 .../storage/pagememory/mv/VersionChain.java        |  31 +-
 .../storage/pagememory/mv/io/VersionChainIo.java   |  40 ++-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 330 +++++++++++++++++----
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |   4 +-
 .../distributed/storage/VersionedRowStore.java     |  11 +-
 17 files changed, 1084 insertions(+), 242 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 2d37a5ce4a..12af2626a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -73,8 +73,8 @@ public abstract class GridUnsafe {
     /** Per-byte copy threshold. */
     private static final long PER_BYTE_THRESHOLD = 0L;
 
-    /** Big endian. */
-    public static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+    /** Flag indicating whether system's native order is big endian. */
+    public static final boolean IS_BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
 
     /** Address size. */
     public static final int ADDR_SIZE = UNSAFE.addressSize();
@@ -571,7 +571,7 @@ public abstract class GridUnsafe {
      * @return Short value from byte array.
      */
     public static short getShort(byte[] arr, long off) {
-        return UNALIGNED ? UNSAFE.getShort(arr, off) : getShortByByte(arr, off, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getShort(arr, off) : getShortByByte(arr, off, IS_BIG_ENDIAN);
     }
 
     /**
@@ -581,7 +581,7 @@ public abstract class GridUnsafe {
      * @return Short value from given address.
      */
     public static short getShort(long addr) {
-        return UNALIGNED ? UNSAFE.getShort(addr) : getShortByByte(addr, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getShort(addr) : getShortByByte(addr, IS_BIG_ENDIAN);
     }
 
     /**
@@ -595,7 +595,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putShort(arr, off, val);
         } else {
-            putShortByByte(arr, off, val, BIG_ENDIAN);
+            putShortByByte(arr, off, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -609,7 +609,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putShort(addr, val);
         } else {
-            putShortByByte(addr, val, BIG_ENDIAN);
+            putShortByByte(addr, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -621,7 +621,7 @@ public abstract class GridUnsafe {
      * @return Char value from byte array.
      */
     public static char getChar(byte[] arr, long off) {
-        return UNALIGNED ? UNSAFE.getChar(arr, off) : getCharByByte(arr, off, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getChar(arr, off) : getCharByByte(arr, off, IS_BIG_ENDIAN);
     }
 
     /**
@@ -631,7 +631,7 @@ public abstract class GridUnsafe {
      * @return Char value from given address.
      */
     public static char getChar(long addr) {
-        return UNALIGNED ? UNSAFE.getChar(addr) : getCharByByte(addr, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getChar(addr) : getCharByByte(addr, IS_BIG_ENDIAN);
     }
 
     /**
@@ -645,7 +645,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putChar(arr, off, val);
         } else {
-            putCharByByte(arr, off, val, BIG_ENDIAN);
+            putCharByByte(arr, off, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -659,7 +659,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putChar(addr, val);
         } else {
-            putCharByByte(addr, val, BIG_ENDIAN);
+            putCharByByte(addr, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -671,7 +671,7 @@ public abstract class GridUnsafe {
      * @return Integer value from byte array.
      */
     public static int getInt(byte[] arr, long off) {
-        return UNALIGNED ? UNSAFE.getInt(arr, off) : getIntByByte(arr, off, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getInt(arr, off) : getIntByByte(arr, off, IS_BIG_ENDIAN);
     }
 
     /**
@@ -681,7 +681,7 @@ public abstract class GridUnsafe {
      * @return Integer value from given address.
      */
     public static int getInt(long addr) {
-        return UNALIGNED ? UNSAFE.getInt(addr) : getIntByByte(addr, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getInt(addr) : getIntByByte(addr, IS_BIG_ENDIAN);
     }
 
     /**
@@ -695,7 +695,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putInt(arr, off, val);
         } else {
-            putIntByByte(arr, off, val, BIG_ENDIAN);
+            putIntByByte(arr, off, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -709,7 +709,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putInt(addr, val);
         } else {
-            putIntByByte(addr, val, BIG_ENDIAN);
+            putIntByByte(addr, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -721,7 +721,7 @@ public abstract class GridUnsafe {
      * @return Long value from byte array.
      */
     public static long getLong(byte[] arr, long off) {
-        return UNALIGNED ? UNSAFE.getLong(arr, off) : getLongByByte(arr, off, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getLong(arr, off) : getLongByByte(arr, off, IS_BIG_ENDIAN);
     }
 
     /**
@@ -731,7 +731,7 @@ public abstract class GridUnsafe {
      * @return Long value from given address.
      */
     public static long getLong(long addr) {
-        return UNALIGNED ? UNSAFE.getLong(addr) : getLongByByte(addr, BIG_ENDIAN);
+        return UNALIGNED ? UNSAFE.getLong(addr) : getLongByByte(addr, IS_BIG_ENDIAN);
     }
 
     /**
@@ -745,7 +745,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putLong(arr, off, val);
         } else {
-            putLongByByte(arr, off, val, BIG_ENDIAN);
+            putLongByByte(arr, off, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -759,7 +759,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putLong(addr, val);
         } else {
-            putLongByByte(addr, val, BIG_ENDIAN);
+            putLongByByte(addr, val, IS_BIG_ENDIAN);
         }
     }
 
@@ -771,7 +771,7 @@ public abstract class GridUnsafe {
      * @return Float value from byte array.
      */
     public static float getFloat(byte[] arr, long off) {
-        return UNALIGNED ? UNSAFE.getFloat(arr, off) : Float.intBitsToFloat(getIntByByte(arr, off, BIG_ENDIAN));
+        return UNALIGNED ? UNSAFE.getFloat(arr, off) : Float.intBitsToFloat(getIntByByte(arr, off, IS_BIG_ENDIAN));
     }
 
     /**
@@ -781,7 +781,7 @@ public abstract class GridUnsafe {
      * @return Float value from given address.
      */
     public static float getFloat(long addr) {
-        return UNALIGNED ? UNSAFE.getFloat(addr) : Float.intBitsToFloat(getIntByByte(addr, BIG_ENDIAN));
+        return UNALIGNED ? UNSAFE.getFloat(addr) : Float.intBitsToFloat(getIntByByte(addr, IS_BIG_ENDIAN));
     }
 
     /**
@@ -795,7 +795,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putFloat(arr, off, val);
         } else {
-            putIntByByte(arr, off, Float.floatToIntBits(val), BIG_ENDIAN);
+            putIntByByte(arr, off, Float.floatToIntBits(val), IS_BIG_ENDIAN);
         }
     }
 
@@ -809,7 +809,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putFloat(addr, val);
         } else {
-            putIntByByte(addr, Float.floatToIntBits(val), BIG_ENDIAN);
+            putIntByByte(addr, Float.floatToIntBits(val), IS_BIG_ENDIAN);
         }
     }
 
@@ -821,7 +821,7 @@ public abstract class GridUnsafe {
      * @return Double value from byte array. Alignment aware.
      */
     public static double getDouble(byte[] arr, long off) {
-        return UNALIGNED ? UNSAFE.getDouble(arr, off) : Double.longBitsToDouble(getLongByByte(arr, off, BIG_ENDIAN));
+        return UNALIGNED ? UNSAFE.getDouble(arr, off) : Double.longBitsToDouble(getLongByByte(arr, off, IS_BIG_ENDIAN));
     }
 
     /**
@@ -831,7 +831,7 @@ public abstract class GridUnsafe {
      * @return Double value from given address.
      */
     public static double getDouble(long addr) {
-        return UNALIGNED ? UNSAFE.getDouble(addr) : Double.longBitsToDouble(getLongByByte(addr, BIG_ENDIAN));
+        return UNALIGNED ? UNSAFE.getDouble(addr) : Double.longBitsToDouble(getLongByByte(addr, IS_BIG_ENDIAN));
     }
 
     /**
@@ -845,7 +845,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putDouble(arr, off, val);
         } else {
-            putLongByByte(arr, off, Double.doubleToLongBits(val), BIG_ENDIAN);
+            putLongByByte(arr, off, Double.doubleToLongBits(val), IS_BIG_ENDIAN);
         }
     }
 
@@ -859,7 +859,7 @@ public abstract class GridUnsafe {
         if (UNALIGNED) {
             UNSAFE.putDouble(addr, val);
         } else {
-            putLongByByte(addr, Double.doubleToLongBits(val), BIG_ENDIAN);
+            putLongByByte(addr, Double.doubleToLongBits(val), IS_BIG_ENDIAN);
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
index 3cb0ae1a69..03a7e4f227 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.util.io;
 
-import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.IS_BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
 
@@ -274,7 +274,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             for (int i = 0; i < arr.length; i++) {
                 arr[i] = GridUnsafe.getShortLittleEndian(buf, off);
 
@@ -298,7 +298,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             for (int i = 0; i < arr.length; i++) {
                 arr[i] = GridUnsafe.getIntLittleEndian(buf, off);
 
@@ -322,7 +322,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             for (int i = 0; i < arr.length; i++) {
                 arr[i] = GridUnsafe.getDoubleLittleEndian(buf, off);
 
@@ -358,7 +358,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             for (int i = 0; i < arr.length; i++) {
                 arr[i] = GridUnsafe.getCharLittleEndian(buf, off);
 
@@ -382,7 +382,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             for (int i = 0; i < arr.length; i++) {
                 arr[i] = GridUnsafe.getLongLittleEndian(buf, off);
 
@@ -406,7 +406,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             for (int i = 0; i < arr.length; i++) {
                 arr[i] = GridUnsafe.getFloatLittleEndian(buf, off);
 
@@ -495,7 +495,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(2);
 
-        return BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(buf, off) : GridUnsafe.getShort(buf, off);
+        return IS_BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(buf, off) : GridUnsafe.getShort(buf, off);
     }
 
     /** {@inheritDoc} */
@@ -511,7 +511,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + this.off;
 
-        char v = BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(buf, off) : GridUnsafe.getChar(buf, off);
+        char v = IS_BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(buf, off) : GridUnsafe.getChar(buf, off);
 
         advanceOffset(2);
 
@@ -525,7 +525,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(4);
 
-        return BIG_ENDIAN ? GridUnsafe.getIntLittleEndian(buf, off) : GridUnsafe.getInt(buf, off);
+        return IS_BIG_ENDIAN ? GridUnsafe.getIntLittleEndian(buf, off) : GridUnsafe.getInt(buf, off);
     }
 
     /** {@inheritDoc} */
@@ -535,7 +535,7 @@ public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInpu
 
         long off = BYTE_ARR_OFF + advanceOffset(8);
 
-        return BIG_ENDIAN ? GridUnsafe.getLongLittleEndian(buf, off) : GridUnsafe.getLong(buf, off);
+        return IS_BIG_ENDIAN ? GridUnsafe.getLongLittleEndian(buf, off) : GridUnsafe.getLong(buf, off);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
index 29e27fd353..f0835aa814 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.util.io;
 
-import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.IS_BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
 
@@ -266,7 +266,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         requestFreeSize(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             long off = BYTE_ARR_OFF + this.off;
 
             for (double val : arr) {
@@ -282,7 +282,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
     }
 
     private void putInt(int val, long off) {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             GridUnsafe.putIntLittleEndian(bytes, off, val);
         } else {
             GridUnsafe.putInt(bytes, off, val);
@@ -307,7 +307,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         requestFreeSize(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             long off = BYTE_ARR_OFF + this.off;
 
             for (char val : arr) {
@@ -331,7 +331,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         requestFreeSize(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             long off = BYTE_ARR_OFF + this.off;
 
             for (long val : arr) {
@@ -355,7 +355,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         requestFreeSize(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             long off = BYTE_ARR_OFF + this.off;
 
             for (float val : arr) {
@@ -397,7 +397,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         requestFreeSize(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             long off = BYTE_ARR_OFF + this.off;
 
             for (short val : arr) {
@@ -421,7 +421,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         requestFreeSize(bytesToCp);
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             long off = BYTE_ARR_OFF + this.off;
 
             for (int val : arr) {
@@ -481,7 +481,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
     }
 
     private void putShort(short val, long off) {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             GridUnsafe.putShortLittleEndian(bytes, off, val);
         } else {
             GridUnsafe.putShort(bytes, off, val);
@@ -497,7 +497,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         long off = BYTE_ARR_OFF + this.off;
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             GridUnsafe.putCharLittleEndian(bytes, off, val);
         } else {
             GridUnsafe.putChar(bytes, off, val);
@@ -525,7 +525,7 @@ public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOu
 
         long off = BYTE_ARR_OFF + this.off;
 
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             GridUnsafe.putLongLittleEndian(bytes, off, v);
         } else {
             GridUnsafe.putLong(bytes, off, v);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index c6b139875c..64f89700f4 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -26,12 +26,12 @@ import static org.apache.ignite.internal.util.ArrayUtils.FLOAT_ARRAY;
 import static org.apache.ignite.internal.util.ArrayUtils.INT_ARRAY;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_ARRAY;
 import static org.apache.ignite.internal.util.ArrayUtils.SHORT_ARRAY;
-import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.IS_BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
 
@@ -204,7 +204,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 GridUnsafe.putShortLittleEndian(heapArr, off, val);
             } else {
                 GridUnsafe.putShort(heapArr, off, val);
@@ -280,7 +280,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 GridUnsafe.putFloatLittleEndian(heapArr, off, val);
             } else {
                 GridUnsafe.putFloat(heapArr, off, val);
@@ -300,7 +300,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 GridUnsafe.putDoubleLittleEndian(heapArr, off, val);
             } else {
                 GridUnsafe.putDouble(heapArr, off, val);
@@ -320,7 +320,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 GridUnsafe.putCharLittleEndian(heapArr, off, val);
             } else {
                 GridUnsafe.putChar(heapArr, off, val);
@@ -368,7 +368,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     @Override
     public void writeShortArray(short[] val) {
         if (val != null) {
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 lastFinished = writeArrayLittleEndian(val, SHORT_ARR_OFF, val.length, 2, 1);
             } else {
                 lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
@@ -382,7 +382,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     @Override
     public void writeIntArray(int[] val) {
         if (val != null) {
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 lastFinished = writeArrayLittleEndian(val, INT_ARR_OFF, val.length, 4, 2);
             } else {
                 lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
@@ -396,7 +396,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     @Override
     public void writeLongArray(long[] val) {
         if (val != null) {
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 lastFinished = writeArrayLittleEndian(val, LONG_ARR_OFF, val.length, 8, 3);
             } else {
                 lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
@@ -410,7 +410,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     @Override
     public void writeLongArray(long[] val, int len) {
         if (val != null) {
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 lastFinished = writeArrayLittleEndian(val, LONG_ARR_OFF, len, 8, 3);
             } else {
                 lastFinished = writeArray(val, LONG_ARR_OFF, len, len << 3);
@@ -424,7 +424,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     @Override
     public void writeFloatArray(float[] val) {
         if (val != null) {
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 lastFinished = writeArrayLittleEndian(val, FLOAT_ARR_OFF, val.length, 4, 2);
             } else {
                 lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
@@ -438,7 +438,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     @Override
     public void writeDoubleArray(double[] val) {
         if (val != null) {
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 lastFinished = writeArrayLittleEndian(val, DOUBLE_ARR_OFF, val.length, 8, 3);
             } else {
                 lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
@@ -452,7 +452,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     @Override
     public void writeCharArray(char[] val) {
         if (val != null) {
-            if (BIG_ENDIAN) {
+            if (IS_BIG_ENDIAN) {
                 lastFinished = writeArrayLittleEndian(val, CHAR_ARR_OFF, val.length, 2, 1);
             } else {
                 lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
@@ -807,7 +807,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            return BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(heapArr, off) : GridUnsafe.getShort(heapArr, off);
+            return IS_BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(heapArr, off) : GridUnsafe.getShort(heapArr, off);
         } else {
             return 0;
         }
@@ -903,7 +903,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            return BIG_ENDIAN ? GridUnsafe.getFloatLittleEndian(heapArr, off) : GridUnsafe.getFloat(heapArr, off);
+            return IS_BIG_ENDIAN ? GridUnsafe.getFloatLittleEndian(heapArr, off) : GridUnsafe.getFloat(heapArr, off);
         } else {
             return 0;
         }
@@ -921,7 +921,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            return BIG_ENDIAN ? GridUnsafe.getDoubleLittleEndian(heapArr, off) : GridUnsafe.getDouble(heapArr, off);
+            return IS_BIG_ENDIAN ? GridUnsafe.getDoubleLittleEndian(heapArr, off) : GridUnsafe.getDouble(heapArr, off);
         } else {
             return 0;
         }
@@ -939,7 +939,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
             long off = baseOff + pos;
 
-            return BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(heapArr, off) : GridUnsafe.getChar(heapArr, off);
+            return IS_BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(heapArr, off) : GridUnsafe.getChar(heapArr, off);
         } else {
             return 0;
         }
@@ -970,7 +970,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override
     public short[] readShortArray() {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             return readArrayLittleEndian(SHORT_ARRAY, 2, 1, SHORT_ARR_OFF);
         } else {
             return readArray(SHORT_ARRAY, 1, SHORT_ARR_OFF);
@@ -980,7 +980,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override
     public int[] readIntArray() {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             return readArrayLittleEndian(INT_ARRAY, 4, 2, INT_ARR_OFF);
         } else {
             return readArray(INT_ARRAY, 2, INT_ARR_OFF);
@@ -990,7 +990,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override
     public long[] readLongArray() {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             return readArrayLittleEndian(LONG_ARRAY, 8, 3, LONG_ARR_OFF);
         } else {
             return readArray(LONG_ARRAY, 3, LONG_ARR_OFF);
@@ -1000,7 +1000,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override
     public float[] readFloatArray() {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             return readArrayLittleEndian(FLOAT_ARRAY, 4, 2, FLOAT_ARR_OFF);
         } else {
             return readArray(FLOAT_ARRAY, 2, FLOAT_ARR_OFF);
@@ -1010,7 +1010,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override
     public double[] readDoubleArray() {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             return readArrayLittleEndian(DOUBLE_ARRAY, 8, 3, DOUBLE_ARR_OFF);
         } else {
             return readArray(DOUBLE_ARRAY, 3, DOUBLE_ARR_OFF);
@@ -1020,7 +1020,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override
     public char[] readCharArray() {
-        if (BIG_ENDIAN) {
+        if (IS_BIG_ENDIAN) {
             return readArrayLittleEndian(CHAR_ARRAY, 2, 1, CHAR_ARR_OFF);
         } else {
             return readArray(CHAR_ARRAY, 1, CHAR_ARR_OFF);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index b6e7074472..ae57993494 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -30,10 +30,10 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Multi-versioned partition storage. Maps RowId to a structures called "Version Chains". Each version chain is logically a stack of
  * elements with the following structure:
- * <pre><code>[timestamp | txId, row data]</code></pre>
+ * <pre><code>[timestamp | transaction state (txId + commitTableId + commitPartitionId), row data]</code></pre>
  *
- * <p>Only the chain's head can contain a transaction id, every other element must have a timestamp. Presence of transaction id indicates
- * that the row is not yet committed.
+ * <p>Only the chain's head can contain a transaction state, every other element must have a timestamp. Presence of transaction state
+ * indicates that the row is not yet committed.
  *
  * <p>All timestamps in the chain must go in decreasing order, giving us a N2O (newest to oldest) order of search.
  *
@@ -111,7 +111,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @throws StorageException If failed to read data from the storage.
      */
     @Nullable
-    BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException;
+    ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException;
 
     /**
      * Reads the value from the storage as it was at the given timestamp.
@@ -120,7 +120,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      */
     @Nullable
     @Deprecated
-    default BinaryRow read(RowId rowId, Timestamp timestamp) throws StorageException {
+    default ReadResult read(RowId rowId, Timestamp timestamp) throws StorageException {
         return read(rowId, convertTimestamp(timestamp));
     }
 
@@ -132,7 +132,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @return Binary row that corresponds to the key or {@code null} if value is not found.
      */
     @Nullable
-    BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws StorageException;
+    ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException;
 
     /**
      * Creates an uncommitted version, assigning a new row id to it.
@@ -142,28 +142,30 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @return Row id.
      * @throws StorageException If failed to write data into the storage.
      *
-     * @deprecated Generates different ids for each replica. {@link #addWrite(RowId, BinaryRow, UUID)} with explicit replicated id must be
-     *      used instead.
+     * @deprecated Generates different ids for each replica. {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} with explicit replicated
+     *      id must be used instead.
      */
     @Deprecated
     RowId insert(BinaryRow binaryRow, UUID txId) throws StorageException;
 
     /**
-     * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id.
-     * In details:
-     * - if there is no uncommitted version, a new uncommitted version is added
-     * - if there is an uncommitted version belonging to the same transaction, it gets replaced by the given version
-     * - if there is an uncommitted version belonging to a different transaction, {@link TxIdMismatchException} is thrown
+     * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id. In details: - if there is no
+     * uncommitted version, a new uncommitted version is added - if there is an uncommitted version belonging to the same transaction, it
+     * gets replaced by the given version - if there is an uncommitted version belonging to a different transaction,
+     * {@link TxIdMismatchException} is thrown
      *
      * @param rowId Row id.
      * @param row Binary row to update. Key only row means value removal.
      * @param txId Transaction id.
+     * @param commitTableId Commit table id.
+     * @param commitPartitionId Commit partitionId.
      * @return Previous uncommitted row version associated with the row id, or {@code null} if no uncommitted version
      *     exists before this call
      * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
      * @throws StorageException If failed to write data to the storage.
      */
-    @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException, StorageException;
+    @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+            throws TxIdMismatchException, StorageException;
 
     /**
      * Aborts a pending update of the ongoing uncommitted transaction. Invoked during rollback.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
new file mode 100644
index 0000000000..ee20c11ad9
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage#read} result.
+ */
+public class ReadResult {
+    /** Unset commit partition id value. */
+    public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
+
+    /** Data. */
+    private final BinaryRow binaryRow;
+
+    /** Transaction id. Not {@code null} iff this is a write-intent. */
+    private final @Nullable UUID transactionId;
+
+    /** Commit table id. Not {@code null} iff this is a write-intent. */
+    private final @Nullable UUID commitTableId;
+
+    /** Commit table id. If this is not a write-intent it is equal to {@link #UNDEFINED_COMMIT_PARTITION_ID}. */
+    private final int commitPartitionId;
+
+    /**
+     * Timestamp of the newest commit of the data. Not {@code null} iff committed version exists, this is a
+     * write-intent and read was made with a timestamp.
+     */
+    private final @Nullable HybridTimestamp newestCommitTs;
+
+    private ReadResult(BinaryRow binaryRow, @Nullable UUID transactionId, @Nullable UUID commitTableId,
+            @Nullable HybridTimestamp newestCommitTs, int commitPartitionId) {
+        this.binaryRow = binaryRow;
+
+        // If transaction is not null, then commitTableId and commitPartitionId should be defined.
+        assert (transactionId == null) || (commitTableId != null && commitPartitionId != -1);
+
+        // If transaction id is null, then commitTableId and commitPartitionId should not be defined.
+        assert (transactionId != null) || (commitTableId == null && commitPartitionId == -1);
+
+        this.transactionId = transactionId;
+        this.commitTableId = commitTableId;
+        this.newestCommitTs = newestCommitTs;
+        this.commitPartitionId = commitPartitionId;
+    }
+
+    public static ReadResult createFromWriteIntent(BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
+            @Nullable HybridTimestamp lastCommittedTimestamp, int commitPartitionId) {
+        return new ReadResult(binaryRow, transactionId, commitTableId, lastCommittedTimestamp, commitPartitionId);
+    }
+
+    public static ReadResult createFromCommitted(BinaryRow binaryRow) {
+        return new ReadResult(binaryRow, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+    }
+
+    /**
+     * Returns binary row representation of the data.
+     *
+     * @return Binary row representation of the data.
+     */
+    public BinaryRow binaryRow() {
+        return binaryRow;
+    }
+
+    /**
+     * Returns transaction id part of the transaction state if this is a write-intent,
+     * {@code null} otherwise.
+     *
+     * @return Transaction id part of the transaction state if this is a write-intent,
+     *         {@code null} otherwise.
+     */
+    public @Nullable UUID transactionId() {
+        return transactionId;
+    }
+
+    /**
+     * Returns commit table id part of the transaction state if this is a write-intent,
+     * {@code null} otherwise.
+     *
+     * @return Commit table id part of the transaction state if this is a write-intent,
+     *         {@code null} otherwise.
+     */
+    public @Nullable UUID commitTableId() {
+        return commitTableId;
+    }
+
+    /**
+     * Returns timestamp of the most recent commit of the row.
+     *
+     * @return Timestamp of the most recent commit of the row.
+     */
+    public @Nullable HybridTimestamp newestCommitTs() {
+        return newestCommitTs;
+    }
+
+    /**
+     * Returns commit partition id part of the transaction state if this is a write-intent,
+     * {@link #UNDEFINED_COMMIT_PARTITION_ID} otherwise.
+     *
+     * @return Commit partition id part of the transaction state if this is a write-intent,
+     *         {@link #UNDEFINED_COMMIT_PARTITION_ID} otherwise.
+     */
+    public int commitPartitionId() {
+        return commitPartitionId;
+    }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 682f913079..1a4f3ee5e2 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -27,10 +27,12 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -44,12 +46,17 @@ import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.Pair;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
  * Base test for MV partition storages.
  */
 public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest {
+    /** This mask is used to create a 64-bit unsigned integer from a {@code long}.  */
+    private static final BigInteger UNSIGNED_LONG_MASK = BigInteger.ONE.shiftLeft(Long.SIZE).subtract(BigInteger.ONE);
+
     /** A partition id that should be used to create a partition instance. */
     protected static final int PARTITION_ID = 1;
 
@@ -65,19 +72,32 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     protected final BinaryRow binaryRow = binaryRow(key, value);
     private final TestValue value2 = new TestValue(21, "bar2");
     protected final BinaryRow binaryRow2 = binaryRow(key, value2);
+    private final BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));
 
     /**
      * Reads a row inside of consistency closure.
      */
     protected BinaryRow read(RowId rowId, UUID txId) {
-        return storage.runConsistently(() -> storage.read(rowId, txId));
+        ReadResult readResult = storage.runConsistently(() -> storage.read(rowId, txId));
+
+        if (readResult == null) {
+            return null;
+        }
+
+        return readResult.binaryRow();
     }
 
     /**
      * Reads a row inside of consistency closure.
      */
     protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
-        return storage.runConsistently(() -> storage.read(rowId, timestamp));
+        ReadResult readResult = storage.runConsistently(() -> storage.read(rowId, timestamp));
+
+        if (readResult == null) {
+            return null;
+        }
+
+        return readResult.binaryRow();
     }
 
     /**
@@ -105,7 +125,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
      * Adds/updates a write-intent inside of consistency closure.
      */
     protected BinaryRow addWrite(RowId rowId, BinaryRow binaryRow, UUID txId) {
-        return storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId));
+        return storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId, UUID.randomUUID(), 0));
     }
 
     /**
@@ -153,7 +173,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     }
 
     /**
-     * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID)}.
+     * Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, int)}.
      */
     @Test
     public void testAddWrite() {
@@ -165,14 +185,14 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         // Write from the same transaction.
         addWrite(rowId, binaryRow, txId);
 
-        // Read without timestamp returns uncommited row.
+        // Read without timestamp returns uncommitted row.
         assertRowMatches(read(rowId, txId), binaryRow);
 
         // Read with wrong transaction id should throw exception.
         assertThrows(TxIdMismatchException.class, () -> read(rowId, newTransactionId()));
 
         // Read with timestamp returns null.
-        assertNull(read(rowId, clock.now()));
+        assertRowMatches(read(rowId, clock.now()), binaryRow);
     }
 
     /**
@@ -222,11 +242,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         assertRowMatches(read(rowId, newTxId), newRow);
 
         assertRowMatches(read(rowId, tsExact), binaryRow);
-        assertRowMatches(read(rowId, tsAfter), binaryRow);
-        assertRowMatches(read(rowId, clock.now()), binaryRow);
+        assertRowMatches(read(rowId, tsAfter), newRow);
+        assertRowMatches(read(rowId, clock.now()), newRow);
 
         // Only latest time behavior changes after commit.
-        commitWrite(rowId, clock.now());
+        HybridTimestamp newRowCommitTs = clock.now();
+        commitWrite(rowId, newRowCommitTs);
 
         assertRowMatches(read(rowId, newTxId), newRow);
 
@@ -246,8 +267,9 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         assertRowMatches(read(rowId, tsExact), binaryRow);
         assertRowMatches(read(rowId, tsAfter), binaryRow);
+        assertRowMatches(read(rowId, newRowCommitTs), newRow);
 
-        assertRowMatches(read(rowId, clock.now()), newRow);
+        assertNull(read(rowId, clock.now()));
 
         // Commit remove.
         HybridTimestamp removeTs = clock.now();
@@ -490,7 +512,19 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     }
 
     @Test
-    void readByTimestampIgnoresUncommittedVersion() {
+    void readByTimestampAfterWriteFindsUncommittedVersion() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        addWrite(rowId, binaryRow, newTransactionId());
+
+        HybridTimestamp latestTs = clock.now();
+        BinaryRow foundRow = read(rowId, latestTs);
+
+        assertRowMatches(foundRow, binaryRow);
+    }
+
+    @Test
+    void readByTimestampAfterCommitAndWriteFindsUncommittedVersion() {
         RowId rowId = insert(binaryRow, newTransactionId());
         commitWrite(rowId, clock.now());
 
@@ -499,7 +533,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         HybridTimestamp latestTs = clock.now();
         BinaryRow foundRow = read(rowId, latestTs);
 
-        assertRowMatches(foundRow, binaryRow);
+        assertRowMatches(foundRow, binaryRow2);
     }
 
     @Test
@@ -678,9 +712,52 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
         RowId rowId = commitAbortAndAddUncommitted();
 
-        BinaryRow foundRow = storage.read(rowId, clock.now());
+        BinaryRow foundRow = storage.read(rowId, clock.now()).binaryRow();
 
-        assertRowMatches(foundRow, binaryRow);
+        // We see the uncommitted row.
+        assertRowMatches(foundRow, binaryRow3);
+    }
+
+    @Test
+    void readByTimestampBeforeAndAfterUncommittedWrite() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        HybridTimestamp commitTs = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow, UUID.randomUUID(), UUID.randomUUID(), 5);
+
+            storage.commitWrite(rowId, commitTs);
+            return null;
+        });
+
+        UUID txId2 = UUID.randomUUID();
+        UUID commitTableId2 = UUID.randomUUID();
+        int commitPartitionId2 = 1338;
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow2, txId2, commitTableId2, commitPartitionId2);
+
+            return null;
+        });
+
+        ReadResult res = storage.read(rowId, commitTs);
+
+        assertNotNull(res);
+
+        assertNull(res.transactionId());
+        assertNull(res.commitTableId());
+        assertEquals(-1, res.commitPartitionId());
+        assertRowMatches(res.binaryRow(), binaryRow);
+
+        res = storage.read(rowId, clock.now());
+
+        assertNotNull(res);
+
+        assertEquals(txId2, res.transactionId());
+        assertEquals(commitTableId2, res.commitTableId());
+        assertEquals(commitPartitionId2, res.commitPartitionId());
+        assertRowMatches(res.binaryRow(), binaryRow2);
     }
 
     private RowId commitAbortAndAddUncommitted() {
@@ -689,25 +766,24 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
             storage.commitWrite(rowId, clock.now());
 
-            storage.addWrite(rowId, binaryRow2, newTransactionId());
+            storage.addWrite(rowId, binaryRow2, newTransactionId(), UUID.randomUUID(), 0);
             storage.abortWrite(rowId);
 
-            BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));
-
-            storage.addWrite(rowId, binaryRow3, newTransactionId());
+            storage.addWrite(rowId, binaryRow3, newTransactionId(), UUID.randomUUID(), 0);
 
             return rowId;
         });
     }
 
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17720")
     void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
         commitAbortAndAddUncommitted();
 
         try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) {
             BinaryRow foundRow = cursor.next();
 
-            assertRowMatches(foundRow, binaryRow);
+            assertRowMatches(foundRow, binaryRow3);
 
             assertFalse(cursor.hasNext());
         }
@@ -719,7 +795,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         BinaryRow foundRow = read(rowId, clock.now());
 
-        assertThat(foundRow, is(nullValue()));
+        assertRowMatches(foundRow, binaryRow);
     }
 
     /**
@@ -746,4 +822,184 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         assertEquals(1, storage.persistedIndex());
     }
+
+    @Test
+    void testReadWithinBeforeAndAfterTwoCommits() {
+        HybridTimestamp before = clock.now();
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        UUID txId = UUID.randomUUID();
+        UUID commitTableId = UUID.randomUUID();
+        int commitPartitionId = 1337;
+
+        HybridTimestamp first = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow, txId, commitTableId, commitPartitionId);
+
+            storage.commitWrite(rowId, first);
+            return null;
+        });
+
+        HybridTimestamp betweenCommits = clock.now();
+
+        HybridTimestamp second = clock.now();
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow2, UUID.randomUUID(), UUID.randomUUID(), commitPartitionId + 1);
+
+            storage.commitWrite(rowId, second);
+            return null;
+        });
+
+        storage.runConsistently(() -> {
+            storage.addWrite(rowId, binaryRow3, UUID.randomUUID(), UUID.randomUUID(), commitPartitionId + 2);
+
+            return null;
+        });
+
+        HybridTimestamp after = clock.now();
+
+        // Read before commits.
+        ReadResult res = storage.read(rowId, before);
+        assertNull(res);
+
+        // Read at exact time of first commit.
+        res = storage.read(rowId, first);
+
+        assertNotNull(res);
+        assertNull(res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow);
+
+        // Read between two commits.
+        res = storage.read(rowId, betweenCommits);
+
+        assertNotNull(res);
+        assertNull(res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow);
+
+        // Read at exact time of second commit.
+        res = storage.read(rowId, second);
+
+        assertNotNull(res);
+        assertNull(res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow2);
+
+        // Read after second commit (write intent).
+        res = storage.read(rowId, after);
+
+        assertNotNull(res);
+        assertNotNull(res.newestCommitTs());
+        assertEquals(second, res.newestCommitTs());
+        assertRowMatches(res.binaryRow(), binaryRow3);
+    }
+
+    @Test
+    void testWrongPartition() {
+        RowId rowId = commitAbortAndAddUncommitted();
+
+        var row = new RowId(rowId.partitionId() + 1, rowId.mostSignificantBits(), rowId.leastSignificantBits());
+
+        assertNull(read(row, clock.now()));
+    }
+
+    @Test
+    void testReadingNothingWithLowerRowIdIfHigherRowIdWritesExist() {
+        RowId rowId = commitAbortAndAddUncommitted();
+
+        RowId lowerRowId = getPreviousRowId(rowId);
+
+        assertNull(read(lowerRowId, clock.now()));
+    }
+
+    @Test
+    void testReadingNothingByTxIdWithLowerRowId() {
+        RowId higherRowId = new RowId(PARTITION_ID);
+        RowId lowerRowId = getPreviousRowId(higherRowId);
+
+        UUID txId = UUID.randomUUID();
+
+        storage.runConsistently(() -> {
+            UUID tableId = UUID.randomUUID();
+
+            storage.addWrite(higherRowId, binaryRow, txId, tableId, PARTITION_ID);
+
+            return null;
+        });
+
+        assertNull(read(lowerRowId, txId));
+    }
+
+    @Test
+    void testReadingCorrectWriteIntentByTimestampIfLowerRowIdWriteIntentExists() {
+        RowId higherRowId = new RowId(PARTITION_ID);
+        RowId lowerRowId = getPreviousRowId(higherRowId);
+
+        UUID txId = UUID.randomUUID();
+
+        storage.runConsistently(() -> {
+            UUID tableId = UUID.randomUUID();
+
+            storage.addWrite(lowerRowId, binaryRow2, UUID.randomUUID(), UUID.randomUUID(), PARTITION_ID);
+            storage.addWrite(higherRowId, binaryRow, txId, tableId, PARTITION_ID);
+
+            storage.commitWrite(higherRowId, clock.now());
+
+            return null;
+        });
+
+        assertRowMatches(read(higherRowId, clock.now()), binaryRow);
+    }
+
+    @Test
+    void testReadingCorrectWriteIntentByTimestampIfHigherRowIdWriteIntentExists() {
+        RowId higherRowId = new RowId(PARTITION_ID);
+        RowId lowerRowId = getPreviousRowId(higherRowId);
+
+        storage.runConsistently(() -> {
+            UUID txId = UUID.randomUUID();
+            UUID tableId = UUID.randomUUID();
+
+            storage.addWrite(lowerRowId, binaryRow, txId, tableId, PARTITION_ID);
+            storage.addWrite(higherRowId, binaryRow2, txId, tableId, PARTITION_ID);
+
+            return null;
+        });
+
+        assertRowMatches(read(lowerRowId, clock.now()), binaryRow);
+    }
+
+    /**
+     * Returns row id that is lexicographically smaller (by the value of one) than the argument.
+     *
+     * @param value Row id.
+     * @return Row id value minus 1.
+     */
+    private RowId getPreviousRowId(RowId value) {
+        Pair<Long, Long> previous128Uint = getPrevious128Uint(value.mostSignificantBits(), value.leastSignificantBits());
+
+        return new RowId(value.partitionId(), previous128Uint.getFirst(), previous128Uint.getSecond());
+    }
+
+    /**
+     * Performs a decrement operation on a 128-bit unsigned value that is represented by two longs.
+     *
+     * @param msb Most significant bytes of 128-bit unsigned integer.
+     * @param lsb Least significant bytes of 128-bit unsigned integer.
+     * @return Less by one value.
+     */
+    private Pair<Long, Long> getPrevious128Uint(long msb, long lsb) {
+        BigInteger unsignedMsb = BigInteger.valueOf(msb).and(UNSIGNED_LONG_MASK);
+        BigInteger unsignedLsb = BigInteger.valueOf(lsb).and(UNSIGNED_LONG_MASK);
+
+        BigInteger biRepresentation = unsignedMsb.shiftLeft(Long.SIZE).add(unsignedLsb);
+
+        BigInteger lesserBi = biRepresentation.subtract(BigInteger.ONE);
+
+        long newMsb = lesserBi.shiftRight(Long.SIZE).and(UNSIGNED_LONG_MASK).longValue();
+        long newLsb = lesserBi.and(UNSIGNED_LONG_MASK).longValue();
+
+        return new Pair<>(newMsb, newLsb);
+    }
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 5efaee4e95..74114db134 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -120,7 +120,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
 
         RowId rowId0 = partitionStorage0.runConsistently(() -> partitionStorage0.insert(testData0, txId));
 
-        assertThat(unwrap(partitionStorage0.read(rowId0, txId)), is(equalTo(unwrap(testData0))));
+        assertThat(unwrap(partitionStorage0.read(rowId0, txId).binaryRow()), is(equalTo(unwrap(testData0))));
         assertThat(partitionStorage1.read(rowId0, txId), is(nullValue()));
 
         var testData1 = binaryRow(new TestKey(2, "2"), new TestValue(20, "20"));
@@ -128,7 +128,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
         RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData1, txId));
 
         assertThat(partitionStorage0.read(rowId1, txId), is(nullValue()));
-        assertThat(unwrap(partitionStorage1.read(rowId1, txId)), is(equalTo(unwrap(testData1))));
+        assertThat(unwrap(partitionStorage1.read(rowId1, txId).binaryRow()), is(equalTo(unwrap(testData1))));
 
         assertThat(toList(partitionStorage0.scan(row -> true, txId)), contains(unwrap(testData0)));
         assertThat(toList(partitionStorage1.scan(row -> true, txId)), contains(unwrap(testData1)));
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 7ebd2c91c0..f7d99993f1 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -29,10 +29,12 @@ import java.util.function.Predicate;
 import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -51,27 +53,34 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
 
     private static class VersionChain {
         final @Nullable BinaryRow row;
-        final @Nullable HybridTimestamp begin;
+        final @Nullable HybridTimestamp ts;
         final @Nullable UUID txId;
+        final @Nullable UUID commitTableId;
+        final int commitPartitionId;
         final @Nullable VersionChain next;
 
-        VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp begin, @Nullable UUID txId, @Nullable VersionChain next) {
+        VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp ts, @Nullable UUID txId, @Nullable UUID commitTableId,
+                int commitPartitionId, @Nullable VersionChain next) {
             this.row = row;
-            this.begin = begin;
+            this.ts = ts;
             this.txId = txId;
+            this.commitTableId = commitTableId;
+            this.commitPartitionId = commitPartitionId;
             this.next = next;
         }
 
-        static VersionChain createUncommitted(@Nullable BinaryRow row, @Nullable UUID txId, @Nullable VersionChain next) {
-            return new VersionChain(row, null, txId, next);
+        static VersionChain forWriteIntent(@Nullable BinaryRow row, @Nullable UUID txId, @Nullable UUID commitTableId,
+                int commitPartitionId, @Nullable VersionChain next) {
+            return new VersionChain(row, null, txId, commitTableId, commitPartitionId, next);
         }
 
-        static VersionChain createCommitted(@Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
-            return new VersionChain(uncommittedVersionChain.row, timestamp, null, uncommittedVersionChain.next);
+        static VersionChain forCommitted(@Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
+            return new VersionChain(uncommittedVersionChain.row, timestamp, null, null,
+                    ReadResult.UNDEFINED_COMMIT_PARTITION_ID, uncommittedVersionChain.next);
         }
 
-        boolean notContainsWriteIntent() {
-            return begin != null && txId == null;
+        boolean notWriteIntent() {
+            return ts != null && txId == null;
         }
     }
 
@@ -112,28 +121,29 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
     public RowId insert(BinaryRow row, UUID txId) throws StorageException {
         RowId rowId = new RowId(partitionId);
 
-        addWrite(rowId, row, txId);
+        addWrite(rowId, row, txId, UUID.randomUUID(), 0);
 
         return rowId;
     }
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException {
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+            throws TxIdMismatchException {
         BinaryRow[] res = {null};
 
         map.compute(rowId, (ignored, versionChain) -> {
-            if (versionChain != null && versionChain.begin == null) {
+            if (versionChain != null && versionChain.ts == null) {
                 if (!txId.equals(versionChain.txId)) {
                     throw new TxIdMismatchException(txId, versionChain.txId);
                 }
 
                 res[0] = versionChain.row;
 
-                return VersionChain.createUncommitted(row, txId, versionChain.next);
+                return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain.next);
             }
 
-            return VersionChain.createUncommitted(row, txId, versionChain);
+            return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain);
         });
 
         return res[0];
@@ -145,11 +155,11 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
         BinaryRow[] res = {null};
 
         map.computeIfPresent(rowId, (ignored, versionChain) -> {
-            if (versionChain.notContainsWriteIntent()) {
+            if (versionChain.notWriteIntent()) {
                 return versionChain;
             }
 
-            assert versionChain.begin == null;
+            assert versionChain.ts == null;
 
             res[0] = versionChain.row;
 
@@ -165,17 +175,17 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
         map.compute(rowId, (ignored, versionChain) -> {
             assert versionChain != null;
 
-            if (versionChain.notContainsWriteIntent()) {
+            if (versionChain.notWriteIntent()) {
                 return versionChain;
             }
 
-            return VersionChain.createCommitted(timestamp, versionChain);
+            return VersionChain.forCommitted(timestamp, versionChain);
         });
     }
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+    public @Nullable ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
         VersionChain versionChain = map.get(rowId);
 
         return read(versionChain, null, txId, null);
@@ -183,14 +193,14 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp timestamp) {
+    public @Nullable ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
         VersionChain versionChain = map.get(rowId);
 
         return read(versionChain, timestamp, null, null);
     }
 
     @Nullable
-    private static BinaryRow read(
+    private static ReadResult read(
             VersionChain versionChain,
             @Nullable HybridTimestamp timestamp,
             @Nullable UUID txId,
@@ -213,27 +223,85 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            return binaryRow;
+            boolean isWriteIntent = versionChain.ts == null;
+
+            if (isWriteIntent) {
+                return ReadResult.createFromWriteIntent(
+                        binaryRow,
+                        versionChain.txId,
+                        versionChain.commitTableId,
+                        versionChain.next != null ? versionChain.next.ts : null,
+                        versionChain.commitPartitionId
+                );
+            }
+
+            return ReadResult.createFromCommitted(binaryRow);
         }
 
         VersionChain cur = versionChain;
 
-        if (cur.begin == null) {
+        boolean hasWriteIntent = false;
+
+        if (cur.ts == null) {
+            hasWriteIntent = true;
+
+            if (cur.next == null) {
+                // We only have a write-intent.
+                BinaryRow binaryRow = cur.row;
+
+                if (filter != null && !filter.test(binaryRow)) {
+                    return null;
+                }
+
+                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, null,
+                        cur.commitPartitionId);
+            }
+
             cur = cur.next;
         }
 
+        return walkVersionChain(versionChain, timestamp, filter, cur, hasWriteIntent);
+    }
+
+    @Nullable
+    private static ReadResult walkVersionChain(VersionChain chainHead, @NotNull HybridTimestamp timestamp,
+            @Nullable Predicate<BinaryRow> filter, VersionChain firstCommit, boolean hasWriteIntent) {
+        boolean isLatestCommit = true;
+
+        VersionChain cur = firstCommit;
+
         while (cur != null) {
-            if (timestamp.compareTo(cur.begin) >= 0) {
+            int compareResult = timestamp.compareTo(cur.ts);
+
+            if (compareResult >= 0) {
+                if (isLatestCommit && hasWriteIntent && (compareResult != 0)) {
+                    // It's the latest commit in chain, query ts is greater than commit ts and there is a write-intent.
+                    // So we just return write-intent.
+                    BinaryRow binaryRow = chainHead.row;
+
+                    if (filter != null && !filter.test(binaryRow)) {
+                        return null;
+                    }
+
+                    HybridTimestamp latestCommitTs = chainHead.next != null ? chainHead.next.ts : null;
+
+                    return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, latestCommitTs,
+                            chainHead.commitPartitionId);
+                }
+
+                // This commit has exactly the query ts, meaning that commit is the one we are looking for.
                 BinaryRow binaryRow = cur.row;
 
                 if (filter != null && !filter.test(binaryRow)) {
                     return null;
                 }
 
-                return binaryRow;
+                return ReadResult.createFromCommitted(binaryRow);
             }
 
             cur = cur.next;
+
+            isLatestCommit = false;
         }
 
         return null;
@@ -245,6 +313,7 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
         Iterator<BinaryRow> iterator = map.values().stream()
                 .map(versionChain -> read(versionChain, null, txId, filter))
                 .filter(Objects::nonNull)
+                .map(ReadResult::binaryRow)
                 .iterator();
 
         return Cursor.fromIterator(iterator);
@@ -256,6 +325,7 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
         Iterator<BinaryRow> iterator = map.values().stream()
                 .map(versionChain -> read(versionChain, timestamp, null, filter))
                 .filter(Objects::nonNull)
+                .map(ReadResult::binaryRow)
                 .iterator();
 
         return Cursor.fromIterator(iterator);
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 9deb69241f..ed17aea627 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -201,7 +202,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+    public @Nullable ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+        if (rowId.partitionId() != partitionId) {
+            return null;
+        }
+
         VersionChain versionChain = findVersionChain(rowId);
 
         if (versionChain == null) {
@@ -213,7 +218,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+    public @Nullable ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+        if (rowId.partitionId() != partitionId) {
+            return null;
+        }
+
         VersionChain versionChain = findVersionChain(rowId);
 
         if (versionChain == null) {
@@ -231,7 +240,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    private @Nullable ByteBufferRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+    private @Nullable ReadResult findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
         RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
 
         ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
@@ -240,9 +249,23 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             return null;
         }
 
-        throwIfChainBelongsToAnotherTx(versionChain, txId);
+        if (versionChain.isUncommitted()) {
+            UUID chainTxId = versionChain.transactionId();
+
+            assert chainTxId != null;
+
+            throwIfChainBelongsToAnotherTx(versionChain, txId);
+
+            return ReadResult.createFromWriteIntent(
+                    row,
+                    versionChain.transactionId(),
+                    versionChain.commitTableId(),
+                    null,
+                    versionChain.commitPartitionId()
+            );
+        }
 
-        return row;
+        return ReadResult.createFromCommitted(row);
     }
 
     private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -258,7 +281,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     private void throwIfChainBelongsToAnotherTx(VersionChain versionChain, UUID txId) {
-        if (versionChain.transactionId() != null && !txId.equals(versionChain.transactionId())) {
+        assert versionChain.isUncommitted();
+
+        if (!txId.equals(versionChain.transactionId())) {
             throw new TxIdMismatchException(txId, versionChain.transactionId());
         }
     }
@@ -271,7 +296,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         return new ByteBufferRow(rowVersion.value());
     }
 
-    private @Nullable ByteBufferRow findRowVersionInChain(
+    private @Nullable BinaryRow findRowVersionInChain(
             VersionChain versionChain,
             @Nullable UUID transactionId,
             @Nullable HybridTimestamp timestamp,
@@ -280,38 +305,65 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         assert transactionId != null ^ timestamp != null;
 
         if (transactionId != null) {
-            return findLatestRowVersion(versionChain, transactionId, keyFilter);
+            ReadResult res = findLatestRowVersion(versionChain, transactionId, keyFilter);
+
+            if (res == null) {
+                return null;
+            }
+
+            return res.binaryRow();
         } else {
-            ByteBufferRow row = findRowVersionByTimestamp(versionChain, timestamp);
+            ReadResult res = findRowVersionByTimestamp(versionChain, timestamp);
+
+            if (res == null) {
+                return null;
+            }
+
+            BinaryRow row = res.binaryRow();
 
             return keyFilter.test(row) ? row : null;
         }
     }
 
-    private @Nullable ByteBufferRow findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
-        if (!versionChain.hasCommittedVersions()) {
-            return null;
-        }
-
-        long newestCommittedLink = versionChain.newestCommittedLink();
+    private @Nullable ReadResult findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
+        long headLink = versionChain.headLink();
 
         ScanVersionChainByTimestamp scanByTimestamp = new ScanVersionChainByTimestamp(partitionId);
 
         try {
-            rowVersionDataPageReader.traverse(newestCommittedLink, scanByTimestamp, timestamp);
+            rowVersionDataPageReader.traverse(headLink, scanByTimestamp, timestamp);
         } catch (IgniteInternalCheckedException e) {
             throw new StorageException("Cannot search for a row version", e);
         }
 
-        return scanByTimestamp.result();
+        ByteBufferRow resultRow = scanByTimestamp.result();
+
+        if (resultRow == null) {
+            return null;
+        }
+
+        if (scanByTimestamp.isResultWriteIntent()) {
+            // If we returned write-intent, then version chain must be uncommitted.
+            assert versionChain.isUncommitted();
+
+            UUID transactionId = versionChain.transactionId();
+            UUID commitTableId = versionChain.commitTableId();
+            int commitPartitionId = versionChain.commitPartitionId();
+
+            return ReadResult.createFromWriteIntent(resultRow, transactionId, commitTableId, scanByTimestamp.lastCommittedTimestamp(),
+                    commitPartitionId);
+        }
+
+        return ReadResult.createFromCommitted(resultRow);
     }
 
     /** {@inheritDoc} */
+    @Deprecated
     @Override
     public RowId insert(BinaryRow row, UUID txId) throws StorageException {
         RowId rowId = new RowId(partitionId);
 
-        addWrite(rowId, row, txId);
+        addWrite(rowId, row, txId, UUID.randomUUID(), 0);
 
         return rowId;
     }
@@ -337,21 +389,26 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId) throws TxIdMismatchException, StorageException {
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+            throws TxIdMismatchException, StorageException {
+        assert rowId.partitionId() == partitionId : rowId;
 
         VersionChain currentChain = findVersionChain(rowId);
 
         if (currentChain == null) {
             RowVersion newVersion = insertRowVersion(row, NULL_LINK);
 
-            VersionChain versionChain = new VersionChain(rowId, txId, newVersion.link(), NULL_LINK);
+            VersionChain versionChain = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, newVersion.link(),
+                    NULL_LINK);
 
             updateVersionChain(versionChain);
 
             return null;
         }
 
-        throwIfChainBelongsToAnotherTx(currentChain, txId);
+        if (currentChain.isUncommitted()) {
+            throwIfChainBelongsToAnotherTx(currentChain, txId);
+        }
 
         RowVersion newVersion = insertRowVersion(row, currentChain.newestCommittedLink());
 
@@ -366,7 +423,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             removeRowVersion(currentVersion);
         }
 
-        VersionChain chainReplacement = new VersionChain(rowId, txId, newVersion.link(), newVersion.nextLink());
+        VersionChain chainReplacement = VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, newVersion.link(),
+                newVersion.nextLink());
 
         updateVersionChain(chainReplacement);
 
@@ -376,6 +434,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     /** {@inheritDoc} */
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
+        assert rowId.partitionId() == partitionId : rowId;
+
         VersionChain currentVersionChain = findVersionChain(rowId);
 
         if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
@@ -393,7 +453,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             // Next can be safely replaced with any value (like 0), because this field is only used when there
             // is some uncommitted value, but when we add an uncommitted value, we 'fix' such placeholder value
             // (like 0) by replacing it with a valid value.
-            VersionChain versionChainReplacement = new VersionChain(rowId, null, latestVersion.nextLink(), NULL_LINK);
+            VersionChain versionChainReplacement = VersionChain.createCommitted(rowId, latestVersion.nextLink(), NULL_LINK);
 
             updateVersionChain(versionChainReplacement);
         } else {
@@ -415,6 +475,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     /** {@inheritDoc} */
     @Override
     public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+        assert rowId.partitionId() == partitionId : rowId;
+
         VersionChain currentVersionChain = findVersionChain(rowId);
 
         if (currentVersionChain == null || currentVersionChain.transactionId() == null) {
@@ -431,9 +493,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
 
         try {
-            VersionChain updatedVersionChain = new VersionChain(
+            VersionChain updatedVersionChain = VersionChain.createCommitted(
                     currentVersionChain.rowId(),
-                    null,
                     currentVersionChain.headLink(),
                     currentVersionChain.nextLink()
             );
@@ -564,7 +625,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 }
 
                 VersionChain chain = getCurrentChainFromTreeCursor();
-                ByteBufferRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
+                BinaryRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
 
                 if (row != null) {
                     nextRow = row;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 60425a0402..4c9fed0db5 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -48,7 +48,7 @@ public final class RowVersion implements Storable {
 
     private long link;
 
-    private final @Nullable HybridTimestamp timestamp;
+    private final @Nullable HybridTimestamp timestamp; // +INF
 
     private final long nextLink;
 
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
index 7d5fd1616e..c23404c56a 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java
@@ -36,16 +36,26 @@ import org.jetbrains.annotations.Nullable;
 class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp> {
     private final int partitionId;
 
+    private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+
     /** Contains the result when the traversal ends. */
     private @Nullable ByteBufferRow result;
 
     /**
      * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink);
-     * then it's {@code false} (when we found the version we need and we read its value).
+     * then it's {@code false} (when we found the version we need, and we read its value).
      */
     private boolean lookingForVersion = true;
 
-    private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue();
+    private boolean hasCommittedRow;
+
+    private boolean hasWriteIntent = false;
+
+    private long writeIntentLink;
+
+    private boolean returnWriteIntent = false;
+
+    private HybridTimestamp lastCommittedTimestamp;
 
     ScanVersionChainByTimestamp(int partitionId) {
         this.partitionId = partitionId;
@@ -57,13 +67,57 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
         if (lookingForVersion) {
             HybridTimestamp rowVersionTs = HybridTimestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET);
 
-            if (rowTimestampMatches(rowVersionTs, timestamp)) {
-                return readFullyOrStartReadingFragmented(link, pageAddr, payload);
-            } else {
-                return advanceToNextVersion(pageAddr, payload);
+            boolean isWriteIntent = rowVersionTs == null;
+
+            if (!hasWriteIntent && isWriteIntent) {
+                hasWriteIntent = true;
+                writeIntentLink = link;
             }
+
+            if (rowVersionTs != null) {
+                boolean isFirstCommittedRow = false;
+
+                if (!hasCommittedRow) {
+                    hasCommittedRow = true;
+                    isFirstCommittedRow = true;
+                    lastCommittedTimestamp = rowVersionTs;
+                }
+
+                int compare = rowVersionTs.compareTo(timestamp);
+
+                if (compare == 0) {
+                    // This is exactly the row that we are looking for.
+                    return readFullyOrStartReadingFragmented(link, pageAddr, payload);
+                }
+
+                if (compare < 0) {
+                    // This row is older than timestamp we are looking for.
+                    if (hasWriteIntent && isFirstCommittedRow) {
+                        // If this is the first committed row, but we have write-intent, return write-intent.
+                        lookingForVersion = false;
+                        returnWriteIntent = true;
+
+                        return writeIntentLink;
+                    }
+
+                    // We are in range between older and newer row, return older row.
+                    return readFullyOrStartReadingFragmented(link, pageAddr, payload);
+                }
+            }
+
+            long nextVersionLink = nextVersionLink(pageAddr, payload);
+
+            if (nextVersionLink == STOP_TRAVERSAL && isWriteIntent) {
+                // We have no committed rows except write-intent, so return write-intent.
+                lookingForVersion = false;
+                returnWriteIntent = true;
+
+                return writeIntentLink;
+            }
+
+            return nextVersionLink;
         } else {
-            // We are continuing reading a fragmented row.
+            // We continue reading a fragmented row.
             return readNextFragment(link, pageAddr, payload);
         }
     }
@@ -78,7 +132,7 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
         return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null);
     }
 
-    private long advanceToNextVersion(long pageAddr, DataPagePayload payload) {
+    private long nextVersionLink(long pageAddr, DataPagePayload payload) {
         long nextLink = readPartitionlessLink(partitionId, pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET);
 
         if (nextLink == NULL_LINK) {
@@ -96,7 +150,7 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
     @Override
     public void finish() {
         if (lookingForVersion) {
-            // we never found the version -> we hever tried to read its value AND the result is null
+            // We never found the version -> we never tried to read its value AND the result is null.
             result = null;
             return;
         }
@@ -116,6 +170,14 @@ class ScanVersionChainByTimestamp implements PageMemoryTraversal<HybridTimestamp
         return result;
     }
 
+    boolean isResultWriteIntent() {
+        return returnWriteIntent;
+    }
+
+    HybridTimestamp lastCommittedTimestamp() {
+        return lastCommittedTimestamp;
+    }
+
     void reset() {
         result = null;
         lookingForVersion = true;
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
index c650bed4dc..3e600a16c5 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 
 import java.util.UUID;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
@@ -37,19 +38,35 @@ public class VersionChain extends VersionChainKey {
     /** Link to the most recent version. */
     private final long headLink;
 
-    /** Link to the newest committed {@link RowVersion} if head is not yet committed, or {@link RowVersion#NULL_LINK} otherwise. */
+    /** Link to the newest committed {@link RowVersion} if head is not yet committed, or {@link PageIdUtils#NULL_LINK} otherwise. */
     private final long nextLink;
 
+    private final @Nullable UUID commitTableId;
+
+    private final int commitPartitionId;
+
     /**
      * Constructor.
      */
-    public VersionChain(RowId rowId, @Nullable UUID transactionId, long headLink, long nextLink) {
+    private VersionChain(RowId rowId, @Nullable UUID transactionId, @Nullable UUID commitTableId, int commitPartitionId, long headLink,
+            long nextLink) {
         super(rowId);
         this.transactionId = transactionId;
+        this.commitTableId = commitTableId;
+        this.commitPartitionId = commitPartitionId;
         this.headLink = headLink;
         this.nextLink = nextLink;
     }
 
+    public static VersionChain createCommitted(RowId rowId, long headLink, long nextLink) {
+        return new VersionChain(rowId, null, null, -1, headLink, nextLink);
+    }
+
+    public static VersionChain createUncommitted(RowId rowId, UUID transactionId, UUID commitTableId, int commitPartitionId, long headLink,
+            long nextLink) {
+        return new VersionChain(rowId, transactionId, commitTableId, commitPartitionId, headLink, nextLink);
+    }
+
     /**
      * Returns a transaction id, associated with a chain's head, or {@code null} if head is already committed.
      */
@@ -57,6 +74,14 @@ public class VersionChain extends VersionChainKey {
         return transactionId;
     }
 
+    public @Nullable UUID commitTableId() {
+        return commitTableId;
+    }
+
+    public int commitPartitionId() {
+        return commitPartitionId;
+    }
+
     /**
      * Returns a link to the newest {@link RowVersion} in the chain.
      */
@@ -65,7 +90,7 @@ public class VersionChain extends VersionChainKey {
     }
 
     /**
-     * Returns a link to the newest committed {@link RowVersion} if head is not yet committed, or {@link RowVersion#NULL_LINK} otherwise.
+     * Returns a link to the newest committed {@link RowVersion} if head is not yet committed, or {@link PageIdUtils#NULL_LINK} otherwise.
      *
      * @see #isUncommitted()
      * @see #newestCommittedLink()
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
index 7e5fcbe202..3215fec385 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.storage.pagememory.mv.io;
 
 import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getShort;
 import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
 import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
 import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionlessLink;
 import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionlessLink;
@@ -36,6 +38,11 @@ import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
  * Interface for VersionChain B+Tree-related IO. Defines a following data layout:
  * <pre><code>[rowId's UUID (16 bytes), txId (16 bytes), head link (6 bytes), next link (6 bytes)]</code></pre>
  */
+// RowId, link, (next lint?), Nullable txId - прямо в страницах дерева
+//          |
+//          v
+//
+
 public interface VersionChainIo {
     /** Offset of rowId's most significant bits, 8 bytes. */
     int ROW_ID_MSB_OFFSET = 0;
@@ -49,8 +56,14 @@ public interface VersionChainIo {
     /** Offset of txId's least significant bits, 8 bytes. */
     int TX_ID_LSB_OFFSET = TX_ID_MSB_OFFSET + Long.BYTES;
 
+    int COMMIT_TABLE_ID_MSB_OFFSET = TX_ID_LSB_OFFSET + Long.BYTES;
+
+    int COMMIT_TABLE_ID_LSB_OFFSET = COMMIT_TABLE_ID_MSB_OFFSET + Long.BYTES;
+
+    int COMMIT_PARTITION_ID_OFFSET = COMMIT_TABLE_ID_LSB_OFFSET + Long.BYTES;
+
     /** Offset of partitionless head link, 6 bytes. */
-    int HEAD_LINK_OFFSET = TX_ID_LSB_OFFSET + Long.BYTES;
+    int HEAD_LINK_OFFSET = COMMIT_PARTITION_ID_OFFSET + Short.BYTES;
 
     /** Offset of partitionless next link, 6 bytes. */
     int NEXT_LINK_OFFSET = HEAD_LINK_OFFSET + PARTITIONLESS_LINK_SIZE_BYTES;
@@ -93,13 +106,25 @@ public interface VersionChainIo {
         putLong(pageAddr, off + ROW_ID_LSB_OFFSET, rowId.leastSignificantBits());
 
         UUID txId = row.transactionId();
+        UUID commitTableId = row.commitTableId();
+        int commitPartitionId = row.commitPartitionId();
 
         if (txId == null) {
+            assert commitTableId == null;
+            assert commitPartitionId == -1;
+
             putLong(pageAddr, off + TX_ID_MSB_OFFSET, NULL_UUID_COMPONENT);
             putLong(pageAddr, off + TX_ID_LSB_OFFSET, NULL_UUID_COMPONENT);
         } else {
+            assert commitTableId != null;
+            assert commitPartitionId >= 0;
+
             putLong(pageAddr, off + TX_ID_MSB_OFFSET, txId.getMostSignificantBits());
             putLong(pageAddr, off + TX_ID_LSB_OFFSET, txId.getLeastSignificantBits());
+
+            putLong(pageAddr, off + COMMIT_TABLE_ID_MSB_OFFSET, commitTableId.getMostSignificantBits());
+            putLong(pageAddr, off + COMMIT_TABLE_ID_LSB_OFFSET, commitTableId.getLeastSignificantBits());
+            putShort(pageAddr, off + COMMIT_PARTITION_ID_OFFSET, (short) commitPartitionId);
         }
 
         writePartitionlessLink(pageAddr + off + HEAD_LINK_OFFSET, row.headLink());
@@ -153,6 +178,17 @@ public interface VersionChainIo {
         long headLink = readPartitionlessLink(partitionId, pageAddr, offset + HEAD_LINK_OFFSET);
         long nextLink = readPartitionlessLink(partitionId, pageAddr, offset + NEXT_LINK_OFFSET);
 
-        return new VersionChain(rowId, txId, headLink, nextLink);
+        if (txId != null) {
+            long commitTblIdMsb = getLong(pageAddr, offset + COMMIT_TABLE_ID_MSB_OFFSET);
+            long commitTblIdLsb = getLong(pageAddr, offset + COMMIT_TABLE_ID_LSB_OFFSET);
+
+            UUID commitTableId = new UUID(commitTblIdMsb, commitTblIdLsb);
+
+            int commitPartitionId = getShort(pageAddr, offset + COMMIT_PARTITION_ID_OFFSET) & 0xFFFF;
+
+            return VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId, headLink, nextLink);
+        }
+
+        return VersionChain.createCommitted(rowId, headLink, nextLink);
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 327dfa26c0..378379f426 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -38,6 +38,7 @@ import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
@@ -57,13 +58,27 @@ import org.rocksdb.WriteBatchWithIndex;
 import org.rocksdb.WriteOptions;
 
 /**
- * Multi-versioned partition storage implementation based on RocksDB. Stored data has the following format:
+ * Multi-versioned partition storage implementation based on RocksDB. Stored data has the following format.
+ *
+ * <p/>Key:
  * <pre><code>
- * | partId (2 bytes, BE) | rowId (16 bytes, BE) |</code></pre>
- * or
+ * For write-intents
+ * | partId (2 bytes, BE) | rowId (16 bytes, BE) |
+ *
+ * For committed rows
+ * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC) |
+ * </code></pre>
+ * Value:
  * <pre><code>
- * | partId (2 bytes, BE) | rowId (16 bytes, BE) | timestamp (12 bytes, DESC) |</code></pre>
- * depending on transaction status. Pending transactions data doesn't have a timestamp assigned.
+ * For write-intents
+ * | txId (16 bytes) | commitTableId (16 bytes) | commitPartitionId (2 bytes) | Row data |
+ *
+ * For committed rows
+ * | Row data |
+ * </code></pre>
+ *
+ * <p/>Pending transactions (write-intents) data doesn't have a timestamp assigned, but they have transaction
+ * state (txId, commitTableId and commitPartitionId).
  *
  * <p/>BE means Big Endian, meaning that lexicographical bytes order matches a natural order of partitions.
  *
@@ -80,9 +95,30 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** Size of the key without timestamp. */
     private static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
 
-    /** Transaction id size. */
+    /** Transaction id size (part of the transaction state). */
     private static final int TX_ID_SIZE = 2 * Long.BYTES;
 
+    /** Commit table id size (part of the transaction state). */
+    private static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+    /** Commit partition id size (part of the transaction state). */
+    private static final int PARTITION_ID_SIZE = Short.BYTES;
+
+    /** Size of the value header (transaction state). */
+    private static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + PARTITION_ID_SIZE;
+
+    /** Transaction id offset. */
+    private static final int TX_ID_OFFSET = 0;
+
+    /** Commit table id offset. */
+    private static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+    /** Commit partition id offset. */
+    private static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+    /** Value offset (if transaction state is present). */
+    private static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
     /** Maximum size of the key. */
     private static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
 
@@ -272,7 +308,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         try {
-            writeUnversioned(keyBuf.array(), row, txId);
+            writeUnversioned(keyBuf.array(), row, txId, UUID.randomUUID(), partitionId);
         } catch (RocksDBException e) {
             throw new StorageException("Failed to insert new row into storage", e);
         }
@@ -282,7 +318,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId)
+    public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
         WriteBatchWithIndex writeBatch = requireWriteBatch();
 
@@ -309,16 +345,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 // Write empty value as a tombstone.
                 if (previousValue != null) {
                     // Reuse old array with transaction id already written to it.
-                    writeBatch.put(cf, keyBytes, copyOf(previousValue, TX_ID_SIZE));
+                    writeBatch.put(cf, keyBytes, copyOf(previousValue, VALUE_HEADER_SIZE));
                 } else {
-                    byte[] txIdBytes = new byte[TX_ID_SIZE];
+                    byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
 
-                    putTransactionId(txIdBytes, 0, txId);
+                    putUuid(valueHeaderBytes, TX_ID_OFFSET, txId);
+                    putUuid(valueHeaderBytes, TABLE_ID_OFFSET, commitTableId);
+                    putShort(valueHeaderBytes, PARTITION_ID_OFFSET, (short) commitPartitionId);
 
-                    writeBatch.put(cf, keyBytes, txIdBytes);
+                    writeBatch.put(cf, keyBytes, valueHeaderBytes);
                 }
             } else {
-                writeUnversioned(keyBufArray, row, txId);
+                writeUnversioned(keyBufArray, row, txId, commitTableId, commitPartitionId);
             }
         } catch (RocksDBException e) {
             throw new StorageException("Failed to update a row in storage", e);
@@ -335,17 +373,21 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * @param txId Transaction id.
      * @throws RocksDBException If write failed.
      */
-    private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId) throws RocksDBException {
+    private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+            throws RocksDBException {
         WriteBatchWithIndex writeBatch = requireWriteBatch();
 
         //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
         byte[] rowBytes = row.bytes();
 
-        ByteBuffer value = ByteBuffer.allocate(rowBytes.length + TX_ID_SIZE).order(LITTLE_ENDIAN);
+        ByteBuffer value = ByteBuffer.allocate(rowBytes.length + VALUE_HEADER_SIZE);
+        byte[] array = value.array();
 
-        putTransactionId(value.array(), 0, txId);
+        putUuid(array, TX_ID_OFFSET, txId);
+        putUuid(array, TABLE_ID_OFFSET, commitTableId);
+        putShort(array, PARTITION_ID_OFFSET, (short) commitPartitionId);
 
-        value.position(TX_ID_SIZE).put(rowBytes);
+        value.position(VALUE_OFFSET).put(rowBytes);
 
         // Write binary row data as a value.
         writeBatch.put(cf, copyOf(keyArray, ROW_PREFIX_SIZE), copyOf(value.array(), value.capacity()));
@@ -401,7 +443,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             // Add timestamp to the key, and put the value back into the storage.
             putTimestamp(keyBuf, timestamp);
 
-            writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, TX_ID_SIZE, valueBytes.length));
+            writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
         } catch (RocksDBException e) {
             throw new StorageException("Failed to commit row into storage", e);
         }
@@ -409,17 +451,17 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
+    public @Nullable ReadResult read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
         return read(rowId, null, txId);
     }
 
     /** {@inheritDoc} */
     @Override
-    public @Nullable BinaryRow read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+    public @Nullable ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
         return read(rowId, timestamp, null);
     }
 
-    private @Nullable BinaryRow read(RowId rowId, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
+    private @Nullable ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
             throws TxIdMismatchException, StorageException {
         assert timestamp == null ^ txId == null;
 
@@ -430,8 +472,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         // We can read data outside of consistency closure. Batch is not required.
         WriteBatchWithIndex writeBatch = WRITE_BATCH.get();
 
-        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-
         try (
                 // Set next partition as an upper bound.
                 var readOpts = new ReadOptions().setIterateUpperBound(upperBound);
@@ -443,56 +483,166 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                         : baseIterator
         ) {
             if (timestamp == null) {
+                ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
                 // Seek to the first appearance of row id if timestamp isn't set.
-                // Since timestamps are sorted from newest to oldest, first occurance will always be the latest version.
+                // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
                 // Unfortunately, copy here is unavoidable with current API.
-                seekIterator.seek(copyOf(keyBuf.array(), keyBuf.position()));
-            } else {
-                // Put timestamp restriction according to N2O timestamps order.
-                putTimestamp(keyBuf, timestamp);
+                assert keyBuf.position() == ROW_PREFIX_SIZE;
+                seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+
+                if (invalid(seekIterator)) {
+                    // No data at all.
+                    return null;
+                }
+
+                ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+
+                int keyLength = seekIterator.key(readKeyBuf);
+
+                if (!matches(rowId, readKeyBuf)) {
+                    // Wrong row id.
+                    return null;
+                }
+
+                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+                byte[] valueBytes = seekIterator.value();
+
+                if (!isWriteIntent) {
+                    // There is no write-intent, return latest committed row.
+                    return wrapCommittedValue(valueBytes);
+                }
 
-                // This seek will either find a key with timestamp that's less or equal than required value, or a different key whatsoever.
-                // It is guaranteed by descending order of timestamps.
-                seekIterator.seek(keyBuf.array());
+                assert valueBytes != null;
+
+                validateTxId(valueBytes, txId);
+
+                return wrapUncommittedValue(valueBytes, null);
+            } else {
+                return readByTimestamp(seekIterator, rowId, timestamp);
             }
+        }
+    }
+
+    private @Nullable ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
+        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
+        // Put timestamp restriction according to N2O timestamps order.
+        putTimestamp(keyBuf, timestamp);
+
+        // This seek will either find a key with timestamp that's less or equal than required value, or a different key whatsoever.
+        // It is guaranteed by descending order of timestamps.
+        seekIterator.seek(keyBuf.array());
+
+        // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
+        // To avoid returning its value, we have to check that actual key matches what we need.
+        // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations.
+        ByteBuffer foundKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+
+        int keyLength = 0;
+
+        if (!invalid(seekIterator)) {
+            keyLength = seekIterator.key(foundKeyBuf);
+        }
+
+        if (invalid(seekIterator) || !matches(rowId, foundKeyBuf)) {
+            // There is no record older than timestamp.
+            // There might be a write-intent which we should return.
+            // Seek to *just* row id.
+            seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
 
-            // Return null if nothing was found.
             if (invalid(seekIterator)) {
+                // There are no writes with row id.
+                return null;
+            }
+
+            foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+            keyLength = seekIterator.key(foundKeyBuf);
+
+            if (!matches(rowId, foundKeyBuf)) {
+                // There are no writes with row id.
                 return null;
             }
 
-            // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key, obviously.
-            // To avoid returning its value, we have to check that actual key matches what we need.
-            // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations.
-            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+            byte[] valueBytes = seekIterator.value();
+
+            boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-            int keyLength = seekIterator.key(directBuffer);
+            if (isWriteIntent) {
+                // Let's check if there is a committed write.
+                seekIterator.next();
 
-            boolean valueHasTxId = keyLength == ROW_PREFIX_SIZE;
+                if (invalid(seekIterator)) {
+                    // There are no committed writes, we can safely return write-intent.
+                    return wrapUncommittedValue(valueBytes, null);
+                }
 
-            // Comparison starts from the position of the row id.
-            directBuffer.position(ROW_ID_OFFSET);
+                foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+                seekIterator.key(foundKeyBuf);
 
-            // Return null if seek found a wrong key.
-            if (!matches(rowId, directBuffer)) {
-                return null;
+                if (!matches(rowId, foundKeyBuf)) {
+                    // There are no committed writes, we can safely return write-intent.
+                    return wrapUncommittedValue(valueBytes, null);
+                }
             }
 
-            // Get binary row from the iterator. It has the exact payload that we need.
+            // There is a committed write, but it's more recent than our timestamp (because we didn't find it with first seek).
+            return null;
+        } else {
+            // Should not be write-intent, as we seek'd with the timestamp.
+            assert keyLength == MAX_KEY_SIZE;
+
+            HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf, ROW_PREFIX_SIZE);
+
             byte[] valueBytes = seekIterator.value();
 
-            assert valueBytes != null;
+            if (rowTimestamp.equals(timestamp)) {
+                // This is exactly the row we are looking for.
+                return wrapCommittedValue(valueBytes);
+            }
 
-            if (txId != null && valueHasTxId) {
-                validateTxId(valueBytes, txId);
+            // Let's check if there is more recent write. If it is a write-intent, then return write-intent.
+            // If it is a committed write, then we are already in a right range.
+            seekIterator.prev();
+
+            if (invalid(seekIterator)) {
+                // There is no more recent commits or write-intents.
+                return wrapCommittedValue(valueBytes);
+            }
+
+            foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
+            keyLength = seekIterator.key(foundKeyBuf);
+
+            if (!matches(rowId, foundKeyBuf)) {
+                // There is no more recent commits or write-intents under this row id.
+                return wrapCommittedValue(valueBytes);
+            }
+
+            boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+            if (isWriteIntent) {
+                valueBytes = seekIterator.value();
+
+                return wrapUncommittedValue(valueBytes, rowTimestamp);
             }
 
-            return wrapValueIntoBinaryRow(valueBytes, valueHasTxId);
+            return wrapCommittedValue(valueBytes);
         }
     }
 
-    private static boolean matches(RowId rowId, ByteBuffer buf) {
-        return rowId.mostSignificantBits() == buf.getLong() && rowId.leastSignificantBits() == buf.getLong();
+    /**
+     * Checks if row id matches the one written in the key buffer. Note: this operation changes the position in the buffer.
+     *
+     * @param rowId Row id.
+     * @param keyByf Key buffer.
+     * @return {@code true} if row id matches the key buffer, {@code false} otherwise.
+     */
+    private static boolean matches(RowId rowId, ByteBuffer keyByf) {
+        // Comparison starts from the position of the row id.
+        keyByf.position(ROW_ID_OFFSET);
+
+        return rowId.mostSignificantBits() == keyByf.getLong() && rowId.leastSignificantBits() == keyByf.getLong();
     }
 
     //TODO IGNITE-16914 Play with prefix settings and benchmark results.
@@ -529,7 +679,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         // Byte buffer from a thread-local field can't be used here, because of two reasons:
         //  - no one guarantees that there will only be a single cursor;
         //  - no one guarantees that returned cursor will not be used by other threads.
-        // The thing is, we need this buffer to preserve its content between invocactions of "hasNext" method.
+        // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
         ByteBuffer seekKeyBuf = ByteBuffer.allocate(seekKeyBufSize).order(BIG_ENDIAN).putShort((short) partitionId);
 
         if (timestamp != null) {
@@ -548,7 +698,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             /** {@inheritDoc} */
             @Override
             public boolean hasNext() {
-                // Fastpath for consecutive invocations.
+                // Fast-path for consecutive invocations.
                 if (next != null) {
                     return true;
                 }
@@ -591,7 +741,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                     // Read the actual key into a direct buffer.
                     int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
 
-                    boolean valueHasTxId = keyLength == ROW_PREFIX_SIZE;
+                    boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
                     directBuffer.limit(ROW_PREFIX_SIZE);
 
@@ -621,7 +771,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                         // itself will be different, and we must check it.
                         keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
 
-                        valueHasTxId = keyLength == ROW_PREFIX_SIZE;
+                        isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
                         directBuffer.limit(ROW_PREFIX_SIZE);
 
@@ -649,10 +799,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                     if (found) {
                         byte[] valueBytes = it.value();
 
-                        BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, valueHasTxId);
+                        BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, isWriteIntent);
 
                         if (binaryRow != null && keyFilter.test(binaryRow)) {
-                            if (txId != null && valueHasTxId) {
+                            if (txId != null && isWriteIntent) {
                                 validateTxId(valueBytes, txId);
                             }
 
@@ -822,11 +972,24 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         buf.putInt(~ts.getLogical());
     }
 
-    private static void putTransactionId(byte[] array, int off, UUID txId) {
+    private static HybridTimestamp readTimestamp(ByteBuffer buf, int off) {
+        assert buf.order() == BIG_ENDIAN;
+
+        long physical = ~buf.getLong(off);
+        int logical = ~buf.getInt(off + Long.BYTES);
+
+        return new HybridTimestamp(physical, logical);
+    }
+
+    private static void putUuid(byte[] array, int off, UUID txId) {
         GridUnsafe.putLong(array, GridUnsafe.BYTE_ARR_OFF + off, txId.getMostSignificantBits());
         GridUnsafe.putLong(array, GridUnsafe.BYTE_ARR_OFF + off + Long.BYTES, txId.getLeastSignificantBits());
     }
 
+    private static void putShort(byte[] array, int off, short value) {
+        GridUnsafe.putShort(array, GridUnsafe.BYTE_ARR_OFF + off, value);
+    }
+
     private static void validateTxId(byte[] valueBytes, UUID txId) {
         long msb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF);
         long lsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + Long.BYTES);
@@ -859,19 +1022,60 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * Converts raw byte array representation of the value into a binary row.
      *
      * @param valueBytes Value bytes as read from the storage.
-     * @param valueHasTxId Whether the value has a transaction id prefix in it.
+     * @param valueHasTxData Whether the value has a transaction id prefix in it.
      * @return Binary row instance or {@code null} if value is a tombstone.
      */
-    private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[] valueBytes, boolean valueHasTxId) {
-        if (isTombstone(valueBytes, valueHasTxId)) {
+    private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[] valueBytes, boolean valueHasTxData) {
+        if (isTombstone(valueBytes, valueHasTxData)) {
             return null;
         }
 
-        return valueHasTxId
-                ? new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(TX_ID_SIZE).slice().order(LITTLE_ENDIAN))
+        return valueHasTxData
+                ? new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(LITTLE_ENDIAN))
                 : new ByteBufferRow(valueBytes);
     }
 
+    /**
+     * Converts raw byte array representation of the write-intent value into a read result adding newest commit timestamp if
+     * it is not {@code null}.
+     *
+     * @param valueBytes Value bytes as read from the storage.
+     * @param newestCommitTs Commit timestamp of the most recent committed write of this value.
+     * @return Read result instance or {@code null} if value is a tombstone.
+     */
+    private static @Nullable ReadResult wrapUncommittedValue(byte[] valueBytes, @Nullable HybridTimestamp newestCommitTs) {
+        if (isTombstone(valueBytes, true)) {
+            return null;
+        }
+
+        long txIdMsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TX_ID_OFFSET);
+        long txIdLsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TX_ID_OFFSET + Long.BYTES);
+
+        long commitTableIdMsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TABLE_ID_OFFSET);
+        long commitTableIdLsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF + TABLE_ID_OFFSET + Long.BYTES);
+
+        int commitPartitionId = GridUnsafe.getShort(valueBytes, GridUnsafe.BYTE_ARR_OFF + PARTITION_ID_OFFSET) & 0xFFFF;
+
+        BinaryRow row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(LITTLE_ENDIAN));
+
+        return ReadResult.createFromWriteIntent(row, new UUID(txIdMsb, txIdLsb), new UUID(commitTableIdMsb, commitTableIdLsb),
+                newestCommitTs, commitPartitionId);
+    }
+
+    /**
+     * Converts raw byte array representation of the value into a read result.
+     *
+     * @param valueBytes Value bytes as read from the storage.
+     * @return Read result instance or {@code null} if value is a tombstone.
+     */
+    private static @Nullable ReadResult wrapCommittedValue(byte[] valueBytes) {
+        if (isTombstone(valueBytes, false)) {
+            return null;
+        }
+
+        return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes));
+    }
+
     /**
      * Creates a prefix of all keys in the given partition.
      */
@@ -894,6 +1098,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * Returns {@code true} if value payload represents a tombstone.
      */
     private static boolean isTombstone(byte[] valueBytes, boolean hasTxId) {
-        return valueBytes.length == (hasTxId ? TX_ID_SIZE : 0);
+        return valueBytes.length == (hasTxId ? VALUE_HEADER_SIZE : 0);
     }
 }
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index e51591470b..19e935122d 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -122,7 +122,7 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
 
         assertThat(tableStorage.getMvPartition(42), is(nullValue()));
         assertThat(tableStorage.getOrCreateMvPartition(42).read(rowId0, txId), is(nullValue()));
-        assertThat(unwrap(tableStorage.getMvPartition(1 << 8).read(rowId1, txId)), is(equalTo(unwrap(testData))));
+        assertThat(unwrap(tableStorage.getMvPartition(1 << 8).read(rowId1, txId).binaryRow()), is(equalTo(unwrap(testData))));
     }
 
     /**
@@ -146,7 +146,7 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
 
         assertThat(tableStorage.getMvPartition(0), is(notNullValue()));
         assertThat(tableStorage.getMvPartition(1), is(nullValue()));
-        assertThat(unwrap(tableStorage.getMvPartition(0).read(rowId0, txId)), is(equalTo(unwrap(testData))));
+        assertThat(unwrap(tableStorage.getMvPartition(0).read(rowId0, txId).binaryRow()), is(equalTo(unwrap(testData))));
     }
 
     @Test
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
index e65ebeb739..6427e6ca51 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.tx.TxManager;
@@ -111,9 +112,9 @@ public class VersionedRowStore {
             return null;
         }
 
-        BinaryRow result = storage.read(rowId, txId);
+        ReadResult result = storage.read(rowId, txId);
 
-        return result;
+        return result != null ? result.binaryRow() : null;
     }
 
     /**
@@ -155,7 +156,7 @@ public class VersionedRowStore {
 
             txsInsertedKeys.computeIfAbsent(txId, entry -> new CopyOnWriteArrayList<ByteBuffer>()).add(key);
         } else {
-            storage.addWrite(rowId, row,  txId);
+            storage.addWrite(rowId, row,  txId, UUID.randomUUID(), 0);
         }
     }
 
@@ -193,13 +194,13 @@ public class VersionedRowStore {
             return false;
         }
 
-        BinaryRow prevRow = storage.read(primaryIndex.get(row.keySlice()), txId);
+        ReadResult prevRow = storage.read(primaryIndex.get(row.keySlice()), txId);
 
         if (prevRow == null) {
             return false;
         }
 
-        storage.addWrite(primaryIndex.get(row.keySlice()), null, txId);
+        storage.addWrite(primaryIndex.get(row.keySlice()), null, txId, UUID.randomUUID(), 0);
 
         txsRemovedKeys.computeIfAbsent(txId, entry -> new CopyOnWriteArrayList<>()).add(row.keySlice());