You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/12/26 07:54:25 UTC

[ignite-3] branch main updated: IGNITE-17467 Return RowIds in MvPartitionStorage#scan() results (#1475)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 197a3f9677 IGNITE-17467 Return RowIds in MvPartitionStorage#scan() results (#1475)
197a3f9677 is described below

commit 197a3f9677fcfa4adc891c1c51fe780aa8a0c571
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Dec 26 11:54:20 2022 +0400

    IGNITE-17467 Return RowIds in MvPartitionStorage#scan() results (#1475)
---
 .../apache/ignite/internal/storage/ReadResult.java | 33 +++++++--
 .../ignite/internal/storage/ReadResultTest.java    | 13 +++-
 .../storage/AbstractMvPartitionStorageTest.java    | 75 +++++++++++--------
 .../storage/BaseMvPartitionStorageTest.java        |  1 +
 .../internal/storage/BaseMvStoragesTest.java       |  2 +-
 .../storage/impl/TestMvPartitionStorage.java       | 84 +++++++++++++---------
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 36 ++++++----
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 56 ++++++++-------
 .../ItAbstractInternalTableScanTest.java           |  3 +-
 .../OutgoingSnapshotMvDataStreamingTest.java       | 49 +++++++------
 10 files changed, 215 insertions(+), 137 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 90f01bb21f..25e316b328 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -29,8 +29,8 @@ public class ReadResult {
     /** Unset commit partition id value. */
     public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
 
-    /** Empty read result. */
-    public static final ReadResult EMPTY = new ReadResult(null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+    /** ID of the row. */
+    private final RowId rowId;
 
     /** Data. {@code null} iff the result is empty (i.e. no row exists or it is a tombstone). */
     private final @Nullable BinaryRow binaryRow;
@@ -58,6 +58,7 @@ public class ReadResult {
     private final @Nullable HybridTimestamp newestCommitTs;
 
     private ReadResult(
+            RowId rowId,
             @Nullable BinaryRow binaryRow,
             @Nullable UUID transactionId,
             @Nullable UUID commitTableId,
@@ -65,6 +66,7 @@ public class ReadResult {
             @Nullable HybridTimestamp newestCommitTs,
             int commitPartitionId
     ) {
+        this.rowId = rowId;
         this.binaryRow = binaryRow;
 
         // If transaction is not null, then commitTableId and commitPartitionId should be defined.
@@ -80,13 +82,32 @@ public class ReadResult {
         this.commitPartitionId = commitPartitionId;
     }
 
-    public static ReadResult createFromWriteIntent(@Nullable BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
+    /**
+     * Returns an empty read result (that is, a result for a missing row ID or for a tombstone) for the given row ID.
+     *
+     * @param rowId ID of the row for which to create a ReadResult.
+     * @return An empty read result.
+     */
+    public static ReadResult empty(RowId rowId) {
+        return new ReadResult(rowId, null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+    }
+
+    public static ReadResult createFromWriteIntent(RowId rowId, @Nullable BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
             int commitPartitionId, @Nullable HybridTimestamp lastCommittedTimestamp) {
-        return new ReadResult(binaryRow, transactionId, commitTableId, null, lastCommittedTimestamp, commitPartitionId);
+        return new ReadResult(rowId, binaryRow, transactionId, commitTableId, null, lastCommittedTimestamp, commitPartitionId);
     }
 
-    public static ReadResult createFromCommitted(@Nullable BinaryRow binaryRow, HybridTimestamp commitTs) {
-        return new ReadResult(binaryRow, null, null, commitTs, null, UNDEFINED_COMMIT_PARTITION_ID);
+    public static ReadResult createFromCommitted(RowId rowId, @Nullable BinaryRow binaryRow, HybridTimestamp commitTs) {
+        return new ReadResult(rowId, binaryRow, null, null, commitTs, null, UNDEFINED_COMMIT_PARTITION_ID);
+    }
+
+    /**
+     * Returns ID of the corresponding row.
+     *
+     * @return ID of the corresponding row.
+     */
+    public RowId rowId() {
+        return rowId;
     }
 
     /**
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
index 9310392568..d192300d27 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
@@ -19,17 +19,24 @@ package org.apache.ignite.internal.storage;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.junit.jupiter.api.Test;
 
 class ReadResultTest {
     @Test
-    void resultInEmptyConstantIsEmpty() {
-        assertTrue(ReadResult.EMPTY.isEmpty());
+    void resultProducecByEmptyMethodIsEmpty() {
+        ReadResult emptyResult = ReadResult.empty(anyRowId());
+
+        assertTrue(emptyResult.isEmpty());
+    }
+
+    private static RowId anyRowId() {
+        return new RowId(0);
     }
 
     @Test
     void resultWithNullRowIsEmpty() {
-        ReadResult result = ReadResult.createFromCommitted(null, null);
+        ReadResult result = ReadResult.createFromCommitted(anyRowId(), null, HybridTimestamp.MAX_VALUE);
 
         assertTrue(result.isEmpty());
     }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index ac315cb10e..1c7bf67cdd 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -17,10 +17,11 @@
 
 package org.apache.ignite.internal.storage;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
@@ -40,7 +41,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -77,7 +77,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
 
     @ParameterizedTest
     @EnumSource
-    public void testScanOverEmpty(ScanTimestampProvider tsProvider) throws Exception {
+    public void testScanOverEmpty(ScanTimestampProvider tsProvider) {
         assertEquals(List.of(), convert(scan(tsProvider.scanTimestamp(clock))));
     }
 
@@ -192,7 +192,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
      * Tests basic invariants of {@link MvPartitionStorage#scan(HybridTimestamp)}.
      */
     @Test
-    public void testScan() throws Exception {
+    public void testScan() {
         TestKey key1 = new TestKey(1, "1");
         TestValue value1 = new TestValue(10, "xxx");
 
@@ -258,7 +258,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
 
             assertThrows(NoSuchElementException.class, () -> cursor.next());
 
-            assertThat(res, hasItems(value1, value2));
+            assertThat(res, containsInAnyOrder(value1, value2));
         }
     }
 
@@ -345,27 +345,29 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
             return cursor.stream()
                     .map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow()))
                     .sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
-                    .collect(Collectors.toList());
+                    .collect(toList());
         }
     }
 
     @Test
-    void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
+    void readOfUncommittedRowReturnsTheRow() {
         RowId rowId = insert(binaryRow, txId);
 
-        BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
+        ReadResult foundResult = storage.read(rowId, HybridTimestamp.MAX_VALUE);
 
-        assertRowMatches(foundRow, binaryRow);
+        assertThat(foundResult.rowId(), is(rowId));
+        assertRowMatches(foundResult.binaryRow(), binaryRow);
     }
 
     @Test
-    void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
+    void readOfCommittedRowReturnsTheRow() {
         RowId rowId = insert(binaryRow, txId);
         commitWrite(rowId, clock.now());
 
-        BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
+        ReadResult foundResult = storage.read(rowId, HybridTimestamp.MAX_VALUE);
 
-        assertRowMatches(foundRow, binaryRow);
+        assertThat(foundResult.rowId(), is(rowId));
+        assertRowMatches(foundResult.binaryRow(), binaryRow);
     }
 
     @Test
@@ -375,9 +377,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
 
         RowId rowId2 = insert(binaryRow2, txId);
 
-        BinaryRow foundRow = read(rowId2, HybridTimestamp.MAX_VALUE);
+        ReadResult foundResult = storage.read(rowId2, HybridTimestamp.MAX_VALUE);
 
-        assertRowMatches(foundRow, binaryRow2);
+        assertThat(foundResult.rowId(), is(rowId2));
+        assertRowMatches(foundResult.binaryRow(), binaryRow2);
     }
 
     @Test
@@ -388,9 +391,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
         RowId rowId2 = insert(binaryRow2, txId);
         commitWrite(rowId2, clock.now());
 
-        BinaryRow foundRow = read(rowId2, HybridTimestamp.MAX_VALUE);
+        ReadResult foundResult = storage.read(rowId2, HybridTimestamp.MAX_VALUE);
 
-        assertRowMatches(foundRow, binaryRow2);
+        assertThat(foundResult.rowId(), is(rowId2));
+        assertRowMatches(foundResult.binaryRow(), binaryRow2);
     }
 
     @Test
@@ -685,10 +689,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
     void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
         RowId rowId = commitAbortAndAddUncommitted();
 
-        BinaryRow foundRow = storage.read(rowId, clock.now()).binaryRow();
+        ReadResult foundResult = storage.read(rowId, clock.now());
 
         // We see the uncommitted row.
-        assertRowMatches(foundRow, binaryRow3);
+        assertThat(foundResult.rowId(), is(rowId));
+        assertRowMatches(foundResult.binaryRow(), binaryRow3);
     }
 
     @Test
@@ -737,7 +742,6 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
             RowId rowId = new RowId(PARTITION_ID);
 
             storage.addWrite(rowId, binaryRow, txId, UUID.randomUUID(), 0);
-
             commitWrite(rowId, clock.now());
 
             addWrite(rowId, binaryRow2, newTransactionId());
@@ -752,12 +756,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
     @ParameterizedTest
     @EnumSource(ScanTimestampProvider.class)
     void scanWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite(ScanTimestampProvider tsProvider) {
-        commitAbortAndAddUncommitted();
+        RowId rowId = commitAbortAndAddUncommitted();
 
         try (Cursor<ReadResult> cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
-            BinaryRow foundRow = cursor.next().binaryRow();
+            ReadResult result = cursor.next();
 
-            assertRowMatches(foundRow, binaryRow3);
+            assertThat(result.rowId(), is(rowId));
+            assertRowMatches(result.binaryRow(), binaryRow3);
 
             assertFalse(cursor.hasNext());
         }
@@ -998,7 +1003,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
         // Reverse expected values to simplify comparison - they are returned in reversed order, newest to oldest.
         Collections.reverse(values);
 
-        List<IgniteBiTuple<TestKey, TestValue>> list = drainToList(storage.scanVersions(rowId));
+        List<IgniteBiTuple<TestKey, TestValue>> list = drainVerifyingRowIdMatch(storage.scanVersions(rowId), rowId);
 
         assertEquals(values.size(), list.size());
 
@@ -1011,10 +1016,19 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
         }
     }
 
+    private static List<IgniteBiTuple<TestKey, TestValue>> drainVerifyingRowIdMatch(Cursor<ReadResult> cursor, RowId rowId) {
+        try (Cursor<ReadResult> c = cursor) {
+            return cursor.stream()
+                    .peek(result -> assertThat(result.rowId(), is(rowId)))
+                    .map(BaseMvStoragesTest::unwrap)
+                    .collect(toList());
+        }
+    }
+
     @ParameterizedTest
     @EnumSource(ScanTimestampProvider.class)
     void testScanWithWriteIntent(ScanTimestampProvider tsProvider) {
-        HybridTimestamp commitTs = addCommittedVersionAndWriteIntent();
+        IgniteBiTuple<RowId, HybridTimestamp> rowIdAndCommitTs = addCommittedVersionAndWriteIntent();
 
         try (PartitionTimestampCursor cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
             assertTrue(cursor.hasNext());
@@ -1023,15 +1037,16 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
 
             assertTrue(next.isWriteIntent());
 
+            assertThat(next.rowId(), is(rowIdAndCommitTs.get1()));
             assertRowMatches(next.binaryRow(), binaryRow2);
 
-            BinaryRow committedRow = cursor.committed(commitTs);
+            BinaryRow committedRow = cursor.committed(rowIdAndCommitTs.get2());
 
             assertRowMatches(committedRow, binaryRow);
         }
     }
 
-    private HybridTimestamp addCommittedVersionAndWriteIntent() {
+    private IgniteBiTuple<RowId, HybridTimestamp> addCommittedVersionAndWriteIntent() {
         RowId rowId = new RowId(PARTITION_ID);
 
         HybridTimestamp commitTs = clock.now();
@@ -1046,7 +1061,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
             return null;
         });
 
-        return commitTs;
+        return new IgniteBiTuple<>(rowId, commitTs);
     }
 
     @Test
@@ -1069,7 +1084,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
             commitWrite(newRowId, clock.now());
         });
 
-        List<IgniteBiTuple<TestKey, TestValue>> list = drainToList(storage.scanVersions(rowId));
+        List<IgniteBiTuple<TestKey, TestValue>> list = drainVerifyingRowIdMatch(storage.scanVersions(rowId), rowId);
 
         assertEquals(2, list.size());
 
@@ -1141,6 +1156,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
         try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
             ReadResult result = cursor.next();
 
+            assertThat(result.rowId(), is(rowId));
             assertTrue(result.isWriteIntent());
             assertThat(result.commitPartitionId(), is(not(ReadResult.UNDEFINED_COMMIT_PARTITION_ID)));
             assertThat(result.commitTableId(), is(notNullValue()));
@@ -1158,6 +1174,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
         try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
             ReadResult result = cursor.next();
 
+            assertThat(result.rowId(), is(rowId));
             assertFalse(result.isWriteIntent());
             assertThat(result.commitPartitionId(), is(ReadResult.UNDEFINED_COMMIT_PARTITION_ID));
             assertThat(result.commitTableId(), is(nullValue()));
@@ -1195,6 +1212,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
             assertTrue(cursor.hasNext());
 
             ReadResult next = cursor.next();
+
+            assertThat(next.rowId(), is(rowId));
             assertNull(next.binaryRow());
             assertEquals(commitTs, next.newestCommitTimestamp());
 
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index f59293ca1e..8bd06d80ce 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -97,6 +97,7 @@ public abstract class BaseMvPartitionStorageTest extends BaseMvStoragesTest {
     /**
      * Reads a row.
      */
+    @Nullable
     protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
         ReadResult readResult = storage.read(rowId, timestamp);
 
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 767cb1a017..7389cfd183 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -168,7 +168,7 @@ public abstract class BaseMvStoragesTest {
         }
     }
 
-    protected final void assertRowMatches(BinaryRow rowUnderQuestion, BinaryRow expectedRow) {
+    protected final void assertRowMatches(@Nullable BinaryRow rowUnderQuestion, BinaryRow expectedRow) {
         assertThat(rowUnderQuestion, is(notNullValue()));
         assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
     }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index a1473b77e8..02e5251418 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -50,9 +49,9 @@ import org.jetbrains.annotations.Nullable;
 public class TestMvPartitionStorage implements MvPartitionStorage {
     private final ConcurrentNavigableMap<RowId, VersionChain> map = new ConcurrentSkipListMap<>();
 
-    private final NavigableSet<IgniteBiTuple<VersionChain, RowId>> gcQueue = new ConcurrentSkipListSet<>(
-            comparing((IgniteBiTuple<VersionChain, RowId> p) -> p.get1().ts)
-                    .thenComparing(IgniteBiTuple::get2)
+    private final NavigableSet<VersionChain> gcQueue = new ConcurrentSkipListSet<>(
+            comparing((VersionChain chain) -> chain.ts)
+                    .thenComparing(chain -> chain.rowId)
     );
 
     private volatile long lastAppliedIndex;
@@ -71,6 +70,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
     }
 
     private static class VersionChain {
+        final RowId rowId;
         final @Nullable BinaryRow row;
         final @Nullable HybridTimestamp ts;
         final @Nullable UUID txId;
@@ -78,8 +78,9 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
         final int commitPartitionId;
         volatile @Nullable VersionChain next;
 
-        VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp ts, @Nullable UUID txId, @Nullable UUID commitTableId,
+        VersionChain(RowId rowId, @Nullable BinaryRow row, @Nullable HybridTimestamp ts, @Nullable UUID txId, @Nullable UUID commitTableId,
                 int commitPartitionId, @Nullable VersionChain next) {
+            this.rowId = rowId;
             this.row = row;
             this.ts = ts;
             this.txId = txId;
@@ -88,13 +89,13 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
             this.next = next;
         }
 
-        static VersionChain forWriteIntent(@Nullable BinaryRow row, @Nullable UUID txId, @Nullable UUID commitTableId,
+        static VersionChain forWriteIntent(RowId rowId, @Nullable BinaryRow row, @Nullable UUID txId, @Nullable UUID commitTableId,
                 int commitPartitionId, @Nullable VersionChain next) {
-            return new VersionChain(row, null, txId, commitTableId, commitPartitionId, next);
+            return new VersionChain(rowId, row, null, txId, commitTableId, commitPartitionId, next);
         }
 
-        static VersionChain forCommitted(@Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
-            return new VersionChain(uncommittedVersionChain.row, timestamp, null, null,
+        static VersionChain forCommitted(RowId rowId, @Nullable HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
+            return new VersionChain(rowId, uncommittedVersionChain.row, timestamp, null, null,
                     ReadResult.UNDEFINED_COMMIT_PARTITION_ID, uncommittedVersionChain.next);
         }
 
@@ -179,10 +180,10 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
                 res[0] = versionChain.row;
 
-                return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain.next);
+                return VersionChain.forWriteIntent(rowId, row, txId, commitTableId, commitPartitionId, versionChain.next);
             }
 
-            return VersionChain.forWriteIntent(row, txId, commitTableId, commitPartitionId, versionChain);
+            return VersionChain.forWriteIntent(rowId, row, txId, commitTableId, commitPartitionId, versionChain);
         });
 
         return res[0];
@@ -220,7 +221,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                 return versionChain;
             }
 
-            return resolveCommittedVersionChain(rowId, VersionChain.forCommitted(timestamp, versionChain));
+            return resolveCommittedVersionChain(VersionChain.forCommitted(rowId, timestamp, versionChain));
         });
     }
 
@@ -237,7 +238,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                 throw new StorageException("Write intent exists for " + rowId);
             }
 
-            return resolveCommittedVersionChain(rowId, new VersionChain(
+            return resolveCommittedVersionChain(new VersionChain(
+                    rowId,
                     row,
                     commitTimestamp,
                     null,
@@ -248,17 +250,18 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
         });
     }
 
-    @Nullable
-    private VersionChain resolveCommittedVersionChain(RowId rowId, VersionChain committedVersionChain) {
-        if (committedVersionChain.next != null) {
+    private VersionChain resolveCommittedVersionChain(VersionChain committedVersionChain) {
+        VersionChain nextChain = committedVersionChain.next;
+
+        if (nextChain != null) {
             // Avoid creating tombstones for tombstones.
-            if (committedVersionChain.row == null && committedVersionChain.next.row == null) {
-                return committedVersionChain.next;
+            if (committedVersionChain.row == null && nextChain.row == null) {
+                return nextChain;
             }
 
             // Calling it from the compute is fine. Concurrent writes of the same row are impossible, and if we call the compute closure
             // several times, the same tuple will be inserted into the GC queue (timestamp and rowId don't change in this case).
-            gcQueue.add(new IgniteBiTuple<>(committedVersionChain, rowId));
+            gcQueue.add(committedVersionChain);
         }
 
         return committedVersionChain;
@@ -275,6 +278,10 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
         VersionChain versionChain = map.get(rowId);
 
+        if (versionChain == null) {
+            return ReadResult.empty(rowId);
+        }
+
         return read(versionChain, timestamp, null);
     }
 
@@ -293,10 +300,6 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
     ) {
         assert timestamp == null ^ txId == null;
 
-        if (versionChain == null) {
-            return ReadResult.EMPTY;
-        }
-
         if (timestamp == null) {
             // Search by transaction id.
 
@@ -315,7 +318,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                 // We *only* have a write-intent, return it.
                 BinaryRow binaryRow = cur.row;
 
-                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null);
+                return ReadResult.createFromWriteIntent(cur.rowId, binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null);
             }
 
             // Move to first commit.
@@ -327,15 +330,18 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
     private static ReadResult versionChainToReadResult(VersionChain versionChain, boolean fillLastCommittedTs) {
         if (versionChain.isWriteIntent()) {
+            VersionChain next = versionChain.next;
+
             return ReadResult.createFromWriteIntent(
+                    versionChain.rowId,
                     versionChain.row,
                     versionChain.txId,
                     versionChain.commitTableId,
-                    versionChain.commitPartitionId, fillLastCommittedTs && versionChain.next != null ? versionChain.next.ts : null
+                    versionChain.commitPartitionId, fillLastCommittedTs && next != null ? next.ts : null
             );
         }
 
-        return ReadResult.createFromCommitted(versionChain.row, versionChain.ts);
+        return ReadResult.createFromCommitted(versionChain.rowId, versionChain.row, versionChain.ts);
     }
 
     /**
@@ -354,8 +360,14 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
             // So we just return write-intent.
             BinaryRow binaryRow = chainHead.row;
 
-            return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, chainHead.commitPartitionId,
-                    firstCommit.ts);
+            return ReadResult.createFromWriteIntent(
+                    chainHead.rowId,
+                    binaryRow,
+                    chainHead.txId,
+                    chainHead.commitTableId,
+                    chainHead.commitPartitionId,
+                    firstCommit.ts
+            );
         }
 
         VersionChain cur = firstCommit;
@@ -367,13 +379,13 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                 // This commit has timestamp matching the query ts, meaning that commit is the one we are looking for.
                 BinaryRow binaryRow = cur.row;
 
-                return ReadResult.createFromCommitted(binaryRow, cur.ts);
+                return ReadResult.createFromCommitted(cur.rowId, binaryRow, cur.ts);
             }
 
             cur = cur.next;
         }
 
-        return ReadResult.EMPTY;
+        return ReadResult.empty(chainHead.rowId);
     }
 
     @Override
@@ -455,6 +467,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
                 ReadResult res = currentReadResult;
 
+                assert res != null;
+
                 currentReadResult = null;
 
                 return res;
@@ -471,22 +485,22 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
     @Override
     public synchronized @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
-        Iterator<IgniteBiTuple<VersionChain, RowId>> it = gcQueue.iterator();
+        Iterator<VersionChain> it = gcQueue.iterator();
 
         if (!it.hasNext()) {
             return null;
         }
 
-        IgniteBiTuple<VersionChain, RowId> next = it.next();
-        VersionChain dequeuedVersionChain = next.get1();
+        VersionChain dequeuedVersionChain = it.next();
 
         if (dequeuedVersionChain.ts.compareTo(lowWatermark) > 0) {
             return null;
         }
 
-        RowId rowId = next.get2();
+        RowId rowId = dequeuedVersionChain.rowId;
 
         VersionChain versionChainToRemove = dequeuedVersionChain.next;
+        assert versionChainToRemove != null;
         assert versionChainToRemove.next == null;
 
         dequeuedVersionChain.next = null;
@@ -503,7 +517,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                     if (cur.next == dequeuedVersionChain) {
                         cur.next = null;
 
-                        gcQueue.remove(new IgniteBiTuple<>(cur, rowId));
+                        gcQueue.remove(cur);
                     }
                 }
 
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 86a2e278e1..cd24c83611 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -322,7 +322,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             VersionChain versionChain = findVersionChain(rowId);
 
             if (versionChain == null) {
-                return ReadResult.EMPTY;
+                return ReadResult.empty(rowId);
             }
 
             if (lookingForLatestVersion(timestamp)) {
@@ -364,7 +364,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         } else {
             ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
-            return ReadResult.createFromCommitted(row, rowVersion.timestamp());
+            return ReadResult.createFromCommitted(versionChain.rowId(), row, rowVersion.timestamp());
         }
     }
 
@@ -424,22 +424,22 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     /**
      * Walks version chain to find a row by timestamp. See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details.
      *
-     * @param chainHead Version chain head.
+     * @param chain Version chain head.
      * @param timestamp Timestamp.
      * @return Read result.
      */
-    private ReadResult walkVersionChain(VersionChain chainHead, HybridTimestamp timestamp) {
-        assert chainHead.hasCommittedVersions();
+    private ReadResult walkVersionChain(VersionChain chain, HybridTimestamp timestamp) {
+        assert chain.hasCommittedVersions();
 
-        boolean hasWriteIntent = chainHead.isUncommitted();
+        boolean hasWriteIntent = chain.isUncommitted();
 
         RowVersion firstCommit;
 
         if (hasWriteIntent) {
             // First commit can only match if its timestamp matches query timestamp.
-            firstCommit = readRowVersion(chainHead.nextLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) == 0);
+            firstCommit = readRowVersion(chain.nextLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) == 0);
         } else {
-            firstCommit = readRowVersion(chainHead.headLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) >= 0);
+            firstCommit = readRowVersion(chain.headLink(), rowTimestamp -> timestamp.compareTo(rowTimestamp) >= 0);
         }
 
         assert firstCommit.isCommitted();
@@ -448,9 +448,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         if (hasWriteIntent && timestamp.compareTo(firstCommit.timestamp()) > 0) {
             // It's the latest commit in chain, query ts is greater than commit ts and there is a write-intent.
             // So we just return write-intent.
-            RowVersion rowVersion = readRowVersion(chainHead.headLink(), ALWAYS_LOAD_VALUE);
+            RowVersion rowVersion = readRowVersion(chain.headLink(), ALWAYS_LOAD_VALUE);
 
-            return writeIntentToResult(chainHead, rowVersion, firstCommit.timestamp());
+            return writeIntentToResult(chain, rowVersion, firstCommit.timestamp());
         }
 
         RowVersion curCommit = firstCommit;
@@ -470,7 +470,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                     row = new ByteBufferRow(curCommit.value());
                 }
 
-                return ReadResult.createFromCommitted(row, curCommit.timestamp());
+                return ReadResult.createFromCommitted(chain.rowId(), row, curCommit.timestamp());
             }
 
             if (!curCommit.hasNextLink()) {
@@ -480,7 +480,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             }
         } while (curCommit != null);
 
-        return ReadResult.EMPTY;
+        return ReadResult.empty(chain.rowId());
     }
 
     private ReadResult writeIntentToResult(VersionChain chain, RowVersion rowVersion, @Nullable HybridTimestamp lastCommittedTimestamp) {
@@ -492,7 +492,14 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         BinaryRow row = rowVersionToBinaryRow(rowVersion);
 
-        return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, lastCommittedTimestamp);
+        return ReadResult.createFromWriteIntent(
+                chain.rowId(),
+                row,
+                transactionId,
+                commitTableId,
+                commitPartitionId,
+                lastCommittedTimestamp
+        );
     }
 
     private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
@@ -760,9 +767,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         ByteBufferRow row = new ByteBufferRow(rowVersion.value());
 
         if (rowVersion.isCommitted()) {
-            return ReadResult.createFromCommitted(row, rowVersion.timestamp());
+            return ReadResult.createFromCommitted(versionChain.rowId(), row, rowVersion.timestamp());
         } else {
             return ReadResult.createFromWriteIntent(
+                    versionChain.rowId(),
                     row,
                     versionChain.transactionId(),
                     versionChain.commitTableId(),
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 3dd513a86a..a609aa3719 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -596,7 +596,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         if (invalid(seekIterator)) {
             // No data at all.
-            return ReadResult.EMPTY;
+            return ReadResult.empty(rowId);
         }
 
         ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
@@ -605,7 +605,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         if (!matches(rowId, readKeyBuf)) {
             // It is already a different row, so no version exists for our rowId.
-            return ReadResult.EMPTY;
+            return ReadResult.empty(rowId);
         }
 
         boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
@@ -615,15 +615,17 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, valueBytes);
     }
 
-    private static ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) {
+    private ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) {
         assert valueBytes != null;
 
+        RowId rowId = getRowId(keyBuf);
+
         if (!isWriteIntent) {
             // There is no write-intent, return latest committed row.
-            return wrapCommittedValue(valueBytes, readTimestamp(keyBuf));
+            return wrapCommittedValue(rowId, valueBytes, readTimestamp(keyBuf));
         }
 
-        return wrapUncommittedValue(valueBytes, null);
+        return wrapUncommittedValue(rowId, valueBytes, null);
     }
 
     /**
@@ -657,7 +659,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * @param keyBuf Buffer with a key in it: partition id + row id + timestamp.
      * @return Read result.
      */
-    private static ReadResult handleReadByTimestampIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp,
+    private ReadResult handleReadByTimestampIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp,
             ByteBuffer keyBuf) {
         // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
         // To avoid returning its value, we have to check that actual key matches what we need.
@@ -678,7 +680,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             if (invalid(seekIterator)) {
                 // There are no writes with row id.
-                return ReadResult.EMPTY;
+                return ReadResult.empty(rowId);
             }
 
             foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -686,7 +688,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             if (!matches(rowId, foundKeyBuf)) {
                 // There are no writes with row id.
-                return ReadResult.EMPTY;
+                return ReadResult.empty(rowId);
             }
 
             byte[] valueBytes = seekIterator.value();
@@ -699,7 +701,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 if (invalid(seekIterator)) {
                     // There are no committed writes, we can safely return write-intent.
-                    return wrapUncommittedValue(valueBytes, null);
+                    return wrapUncommittedValue(rowId, valueBytes, null);
                 }
 
                 foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -707,12 +709,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 if (!matches(rowId, foundKeyBuf)) {
                     // There are no committed writes, we can safely return write-intent.
-                    return wrapUncommittedValue(valueBytes, null);
+                    return wrapUncommittedValue(rowId, valueBytes, null);
                 }
             }
 
             // There is a committed write, but it's more recent than our timestamp (because we didn't find it with first seek).
-            return ReadResult.EMPTY;
+            return ReadResult.empty(rowId);
         } else {
             // Should not be write-intent, as we were seeking with the timestamp.
             assert keyLength == MAX_KEY_SIZE;
@@ -723,7 +725,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             if (rowTimestamp.equals(timestamp)) {
                 // This is exactly the row we are looking for.
-                return wrapCommittedValue(valueBytes, rowTimestamp);
+                return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
             }
 
             // Let's check if there is more recent write. If it is a write-intent, then return write-intent.
@@ -732,7 +734,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             if (invalid(seekIterator)) {
                 // There is no more recent commits or write-intents.
-                return wrapCommittedValue(valueBytes, rowTimestamp);
+                return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
             }
 
             foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -740,16 +742,16 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             if (!matches(rowId, foundKeyBuf)) {
                 // There is no more recent commits or write-intents under this row id.
-                return wrapCommittedValue(valueBytes, rowTimestamp);
+                return wrapCommittedValue(rowId, valueBytes, rowTimestamp);
             }
 
             boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
             if (isWriteIntent) {
-                return wrapUncommittedValue(seekIterator.value(), rowTimestamp);
+                return wrapUncommittedValue(rowId, seekIterator.value(), rowTimestamp);
             }
 
-            return wrapCommittedValue(valueBytes, readTimestamp(foundKeyBuf));
+            return wrapCommittedValue(rowId, valueBytes, readTimestamp(foundKeyBuf));
         }
     }
 
@@ -875,10 +877,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         buf.position(0);
     }
 
-    private RowId getRowId(ByteBuffer buffer) {
-        buffer.position(ROW_ID_OFFSET);
+    private RowId getRowId(ByteBuffer keyBuffer) {
+        keyBuffer.position(ROW_ID_OFFSET);
 
-        return new RowId(partitionId, normalize(buffer.getLong()), normalize(buffer.getLong()));
+        return new RowId(partitionId, normalize(keyBuffer.getLong()), normalize(keyBuffer.getLong()));
     }
 
     @Override
@@ -1037,11 +1039,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * Converts raw byte array representation of the write-intent value into a read result adding newest commit timestamp if
      * it is not {@code null}.
      *
+     * @param rowId ID of the corresponding row.
      * @param valueBytes Value bytes as read from the storage.
      * @param newestCommitTs Commit timestamp of the most recent committed write of this value.
      * @return Read result instance.
      */
-    private static ReadResult wrapUncommittedValue(byte[] valueBytes, @Nullable HybridTimestamp newestCommitTs) {
+    private static ReadResult wrapUncommittedValue(RowId rowId, byte[] valueBytes, @Nullable HybridTimestamp newestCommitTs) {
         UUID txId = bytesToUuid(valueBytes, TX_ID_OFFSET);
         UUID commitTableId = bytesToUuid(valueBytes, TABLE_ID_OFFSET);
         int commitPartitionId = GridUnsafe.getShort(valueBytes, GridUnsafe.BYTE_ARR_OFF + PARTITION_ID_OFFSET) & 0xFFFF;
@@ -1054,22 +1057,23 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(BINARY_ROW_BYTE_ORDER));
         }
 
-        return ReadResult.createFromWriteIntent(row, txId, commitTableId, commitPartitionId, newestCommitTs);
+        return ReadResult.createFromWriteIntent(rowId, row, txId, commitTableId, commitPartitionId, newestCommitTs);
     }
 
     /**
      * Converts raw byte array representation of the value into a read result.
      *
+     * @param rowId ID of the corresponding row.
      * @param valueBytes Value bytes as read from the storage.
      * @param rowCommitTimestamp Timestamp with which the row was committed.
      * @return Read result instance or {@code null} if value is a tombstone.
      */
-    private static ReadResult wrapCommittedValue(byte[] valueBytes, HybridTimestamp rowCommitTimestamp) {
+    private static ReadResult wrapCommittedValue(RowId rowId, byte[] valueBytes, HybridTimestamp rowCommitTimestamp) {
         if (isTombstone(valueBytes, false)) {
-            return ReadResult.EMPTY;
+            return ReadResult.empty(rowId);
         }
 
-        return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes), rowCommitTimestamp);
+        return ReadResult.createFromCommitted(rowId, new ByteBufferRow(valueBytes), rowCommitTimestamp);
     }
 
     /**
@@ -1251,9 +1255,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 if (!isWriteIntent) {
                     // There is no write-intent, return latest committed row.
-                    readResult = wrapCommittedValue(valueBytes, readTimestamp(currentKeyBuffer));
+                    readResult = wrapCommittedValue(rowId, valueBytes, readTimestamp(currentKeyBuffer));
                 } else {
-                    readResult = wrapUncommittedValue(valueBytes, nextCommitTimestamp);
+                    readResult = wrapUncommittedValue(rowId, valueBytes, nextCommitTimestamp);
                 }
 
                 if (!readResult.isEmpty() || readResult.isWriteIntent()) {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index f36e1bc0b2..6a3bd4572c 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -385,7 +386,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest
             when(cursor.hasNext()).thenAnswer(hnInvocation -> cursorTouchCnt.get() < submittedItems.size());
 
             lenient().when(cursor.next()).thenAnswer(ninvocation ->
-                    ReadResult.createFromCommitted(submittedItems.get(cursorTouchCnt.getAndIncrement()), clock.now()));
+                    ReadResult.createFromCommitted(new RowId(0), submittedItems.get(cursorTouchCnt.getAndIncrement()), clock.now()));
 
             return cursor;
         });
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 64d752b27b..849da6801b 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -102,8 +102,9 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsCommittedAndUncommittedVersionsFromStorage() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromWriteIntent(
+                rowId1,
                 new ByteBufferRow(new byte[]{2}),
                 transactionId,
                 commitTableId,
@@ -155,8 +156,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void reversesOrderOfVersionsObtainedFromPartition() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
 
         configureStorageToHaveExactlyOneRowWith(List.of(version1, version2));
 
@@ -174,8 +175,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void iteratesRowsInPartition() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
 
         when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
@@ -213,8 +214,9 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsCommittedAndUncommittedVersionsFromQueue() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
         ReadResult version2 = ReadResult.createFromWriteIntent(
+                rowIdOutOfOrder,
                 new ByteBufferRow(new byte[]{2}),
                 transactionId,
                 commitTableId,
@@ -257,8 +259,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsOutOfOrderRowsWithHighestPriority() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
 
         when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version1)));
 
@@ -298,7 +300,7 @@ class OutgoingSnapshotMvDataStreamingTest {
     }
 
     @Test
-    void closesVersionsCursor() throws Exception {
+    void closesVersionsCursor() {
         @SuppressWarnings("unchecked")
         Cursor<ReadResult> cursor = mock(Cursor.class);
 
@@ -319,7 +321,7 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsTombstonesWithNullBuffers() {
-        ReadResult version = ReadResult.createFromCommitted(null, clock.now());
+        ReadResult version = ReadResult.createFromCommitted(rowId1, null, clock.now());
 
         configureStorageToHaveExactlyOneRowWith(List.of(version));
 
@@ -331,6 +333,7 @@ class OutgoingSnapshotMvDataStreamingTest {
     @Test
     void doesNotSendWriteIntentTimestamp() {
         ReadResult version = ReadResult.createFromWriteIntent(
+                rowId1,
                 new ByteBufferRow(new byte[]{1}),
                 transactionId,
                 commitTableId,
@@ -356,8 +359,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void mvDataHandlingRespectsBatchSizeHintForMessagesFromPartition() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
 
         when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1)));
@@ -371,8 +374,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void mvDataHandlingRespectsBatchSizeHintForOutOfOrderMessages() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
 
         when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version1)));
 
@@ -395,8 +398,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void mvDataResponseThatIsNotLastHasFinishFalse() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
 
         when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
@@ -409,7 +412,7 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsRowsFromPartitionBiggerThanHint() {
-        ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[1000]), clock.now());
+        ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[1000]), clock.now());
 
         configureStorageToHaveExactlyOneRowWith(List.of(version));
 
@@ -422,7 +425,7 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void sendsRowsFromOutOfOrderQueueBiggerThanHint() {
-        ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[1000]), clock.now());
+        ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[1000]), clock.now());
 
         when(mvPartitionStorage.scanVersions(rowIdOutOfOrder)).thenReturn(Cursor.fromIterable(List.of(version)));
 
@@ -456,8 +459,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void lastSentRowIdIsPassed() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
 
         when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
@@ -476,8 +479,8 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void notYetSentRowIdIsNotPassed() {
-        ReadResult version1 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
-        ReadResult version2 = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{2}), clock.now());
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
 
         when(mvPartitionStorage.closestRowId(lowestRowId)).thenReturn(rowId1);
         when(mvPartitionStorage.scanVersions(rowId1)).thenReturn(Cursor.fromIterable(List.of(version1, version2)));
@@ -496,7 +499,7 @@ class OutgoingSnapshotMvDataStreamingTest {
 
     @Test
     void anyRowIdIsPassedForFinishedSnapshot() {
-        ReadResult version = ReadResult.createFromCommitted(new ByteBufferRow(new byte[]{1}), clock.now());
+        ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
 
         configureStorageToHaveExactlyOneRowWith(List.of(version));