You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/13 13:31:12 UTC

[ignite-3] branch main updated: IGNITE-17254 Storage API for RAFT snapshot streaming (#1194)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fb52c5b8e IGNITE-17254 Storage API for RAFT snapshot streaming (#1194)
3fb52c5b8e is described below

commit 3fb52c5b8e47895d9df93b25e22c049797baf09a
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Oct 13 17:31:06 2022 +0400

    IGNITE-17254 Storage API for RAFT snapshot streaming (#1194)
---
 .../internal/storage/MvPartitionStorage.java       |  23 ++++-
 .../apache/ignite/internal/storage/ReadResult.java |  42 ++++++--
 .../storage/AbstractMvPartitionStorageTest.java    | 106 ++++++++++++++++++++-
 .../internal/storage/BaseMvStoragesTest.java       |   8 +-
 .../storage/impl/TestMvPartitionStorage.java       |  63 +++++++-----
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  71 ++++++++++++--
 .../internal/storage/pagememory/mv/RowVersion.java |   7 ++
 .../storage/pagememory/mv/VersionChain.java        |   3 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |  72 ++++++++++----
 .../rocksdb/RocksDbMvPartitionStorageTest.java     |   8 ++
 .../distributed/ItInternalTableScanTest.java       |   5 +-
 11 files changed, 332 insertions(+), 76 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 0c0ee96dee..2abd01450a 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -98,6 +98,9 @@ public interface MvPartitionStorage extends AutoCloseable {
      *     <li>There are commits but they're all newer than timestamp - return nothing.</li>
      * </ol>
      *
+     * <p>{@link ReadResult#newestCommitTimestamp()} is filled by this method for intents having preceding committed
+     * versions.
+     *
      * @param rowId Row id.
      * @param timestamp Timestamp.
      * @return Read result that corresponds to the key.
@@ -142,12 +145,30 @@ public interface MvPartitionStorage extends AutoCloseable {
      */
     void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException;
 
+    /**
+     * Creates a committed version.
+     * In details:
+     * - if there is no uncommitted version, a new committed version is added
+     * - if there is an uncommitted version, this method may fail with a system exception (this method should not be called if there
+     *   is already something uncommitted for the given row).
+     *
+     * @param rowId Row id.
+     * @param row Binary row to update. Key only row means value removal.
+     * @param commitTimestamp Timestamp to associate with committed value.
+     * @throws StorageException If failed to write data to the storage.
+     */
+    void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException;
+
     /**
      * Scans all versions of a single row.
      *
+     * <p>{@link ReadResult#newestCommitTimestamp()} is NOT filled by this method for intents having preceding committed
+     * versions.
+     *
      * @param rowId Row id.
+     * @return Cursor of results including both rows data and transaction-related context. The versions are ordered from newest to oldest.
      */
-    Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException;
+    Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
 
     /**
      * Scans the partition and returns a cursor of values at the given timestamp.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index a1d912c6b2..41819efe8e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -30,7 +30,7 @@ public class ReadResult {
     public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
 
     /** Empty read result. */
-    public static final ReadResult EMPTY = new ReadResult(null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+    public static final ReadResult EMPTY = new ReadResult(null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
 
     /** Data. */
     private final @Nullable BinaryRow binaryRow;
@@ -45,13 +45,26 @@ public class ReadResult {
     private final int commitPartitionId;
 
     /**
-     * Timestamp of the newest commit of the data. Not {@code null} iff committed version exists, this is a
+     * Commit timestamp of this version (if exists). Non-null for committed versions, {@code null} for write intents.
+     */
+    private final @Nullable HybridTimestamp commitTs;
+
+    /**
+     * Timestamp of the newest commit of the data. Not {@code null} if committed version exists, this is a
      * write-intent and read was made with a timestamp.
+     * Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)} even for write intents having
+     * a preceding committed version.
      */
     private final @Nullable HybridTimestamp newestCommitTs;
 
-    private ReadResult(BinaryRow binaryRow, @Nullable UUID transactionId, @Nullable UUID commitTableId,
-            @Nullable HybridTimestamp newestCommitTs, int commitPartitionId) {
+    private ReadResult(
+            @Nullable BinaryRow binaryRow,
+            @Nullable UUID transactionId,
+            @Nullable UUID commitTableId,
+            @Nullable HybridTimestamp commitTs,
+            @Nullable HybridTimestamp newestCommitTs,
+            int commitPartitionId
+    ) {
         this.binaryRow = binaryRow;
 
         // If transaction is not null, then commitTableId and commitPartitionId should be defined.
@@ -62,17 +75,18 @@ public class ReadResult {
 
         this.transactionId = transactionId;
         this.commitTableId = commitTableId;
+        this.commitTs = commitTs;
         this.newestCommitTs = newestCommitTs;
         this.commitPartitionId = commitPartitionId;
     }
 
     public static ReadResult createFromWriteIntent(BinaryRow binaryRow, UUID transactionId, UUID commitTableId,
-            @Nullable HybridTimestamp lastCommittedTimestamp, int commitPartitionId) {
-        return new ReadResult(binaryRow, transactionId, commitTableId, lastCommittedTimestamp, commitPartitionId);
+            int commitPartitionId, @Nullable HybridTimestamp lastCommittedTimestamp) {
+        return new ReadResult(binaryRow, transactionId, commitTableId, null, lastCommittedTimestamp, commitPartitionId);
     }
 
-    public static ReadResult createFromCommitted(BinaryRow binaryRow) {
-        return new ReadResult(binaryRow, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
+    public static ReadResult createFromCommitted(BinaryRow binaryRow, HybridTimestamp commitTs) {
+        return new ReadResult(binaryRow, null, null, commitTs, null, UNDEFINED_COMMIT_PARTITION_ID);
     }
 
     /**
@@ -107,7 +121,17 @@ public class ReadResult {
     }
 
     /**
-     * Returns timestamp of the most recent commit of the row.
+     * Returns commit timestamp of this version (of exists). Non-null for committed versions, {@code null} for write intents.
+     *
+     * @return Commit timestamp of this version (of exists). Non-null for committed versions, {@code null} for write intents.
+     */
+    public @Nullable HybridTimestamp commitTimestamp() {
+        return commitTs;
+    }
+
+    /**
+     * Returns timestamp of the most recent commit of the row. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
+     * even for write intents having a preceding committed version.
      *
      * @return Timestamp of the most recent commit of the row.
      */
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index c687068f24..dccd12b700 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,19 +74,19 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     private final BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));
 
     /**
-     * Reads a row inside of consistency closure.
+     * Reads a row.
      */
     protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
-        ReadResult readResult = storage.runConsistently(() -> storage.read(rowId, timestamp));
+        ReadResult readResult = storage.read(rowId, timestamp);
 
         return readResult.binaryRow();
     }
 
     /**
-     * Scans partition inside of consistency closure.
+     * Scans partition.
      */
     protected PartitionTimestampCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
-        return storage.runConsistently(() -> storage.scan(filter, timestamp));
+        return storage.scan(filter, timestamp);
     }
 
     /**
@@ -117,6 +118,18 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         });
     }
 
+    /**
+     * Writes a row to storage like if it was first added using {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, int)}
+     * and immediately committed with {@link MvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
+     */
+    protected void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) {
+        storage.runConsistently(() -> {
+            storage.addWriteCommitted(rowId, row, commitTimestamp);
+
+            return null;
+        });
+    }
+
     /**
      * Aborts write-intent inside of consistency closure.
      */
@@ -602,6 +615,15 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         assertThat(returnedRow, is(nullValue()));
     }
 
+    @Test
+    void addWriteCreatesUncommittedVersion() {
+        RowId rowId = insert(binaryRow, txId);
+
+        ReadResult readResult = storage.read(rowId, clock.now());
+
+        assertTrue(readResult.isWriteIntent());
+    }
+
     @Test
     void afterRemovalReadWithTxIdFindsNothing() {
         RowId rowId = insert(binaryRow, newTransactionId());
@@ -660,6 +682,16 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         assertThat(rowFromRemoval, is(nullValue()));
     }
 
+    @Test
+    void commitWriteCommitsWriteIntentVersion() {
+        RowId rowId = insert(binaryRow, txId);
+        commitWrite(rowId, clock.now());
+
+        ReadResult readResult = storage.read(rowId, clock.now());
+
+        assertFalse(readResult.isWriteIntent());
+    }
+
     @Test
     void commitWriteMakesVersionAvailableToReadByTimestamp() {
         RowId rowId = insert(binaryRow, txId);
@@ -1138,6 +1170,72 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         assertNull(storage.closestRowId(rowId2.increment()));
     }
 
+    @Test
+    public void addWriteCommittedAddsCommittedVersion() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        addWriteCommitted(rowId, binaryRow, clock.now());
+
+        // Read with timestamp returns write-intent.
+        assertRowMatches(storage.read(rowId, clock.now()).binaryRow(), binaryRow);
+    }
+
+    @Test
+    public void addWriteCommittedLeavesExistingCommittedVersionsUntouched() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        HybridTimestamp ts1 = clock.now();
+
+        addWriteCommitted(rowId, binaryRow, ts1);
+        addWriteCommitted(rowId, binaryRow2, clock.now());
+
+        assertRowMatches(storage.read(rowId, clock.now()).binaryRow(), binaryRow2);
+        assertRowMatches(storage.read(rowId, ts1).binaryRow(), binaryRow);
+    }
+
+    @Test
+    public void addWriteCommittedThrowsIfUncommittedVersionExists() {
+        RowId rowId = insert(binaryRow, txId);
+
+        StorageException ex = assertThrows(StorageException.class, () -> addWriteCommitted(rowId, binaryRow2, clock.now()));
+        assertThat(ex.getMessage(), is("Write intent exists for " + rowId));
+    }
+
+    @Test
+    public void scanVersionsReturnsUncommittedVersionsAsUncommitted() throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        commitWrite(rowId, clock.now());
+        addWrite(rowId, binaryRow2, newTransactionId());
+
+        try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+            ReadResult result = cursor.next();
+
+            assertTrue(result.isWriteIntent());
+            assertThat(result.commitPartitionId(), is(not(ReadResult.UNDEFINED_COMMIT_PARTITION_ID)));
+            assertThat(result.commitTableId(), is(notNullValue()));
+            assertThat(result.transactionId(), is(notNullValue()));
+            assertThat(result.commitTimestamp(), is(nullValue()));
+            assertThat(result.newestCommitTimestamp(), is(nullValue()));
+        }
+    }
+
+    @Test
+    public void scanVersionsReturnsCommittedVersionsAsCommitted() throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        commitWrite(rowId, clock.now());
+
+        try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+            ReadResult result = cursor.next();
+
+            assertFalse(result.isWriteIntent());
+            assertThat(result.commitPartitionId(), is(ReadResult.UNDEFINED_COMMIT_PARTITION_ID));
+            assertThat(result.commitTableId(), is(nullValue()));
+            assertThat(result.transactionId(), is(nullValue()));
+            assertThat(result.commitTimestamp(), is(notNullValue()));
+            assertThat(result.newestCommitTimestamp(), is(nullValue()));
+        }
+    }
+
     /**
      * Returns row id that is lexicographically smaller (by the value of one) than the argument.
      *
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 6319179fe9..4c44aff251 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -129,13 +129,7 @@ public abstract class BaseMvStoragesTest {
         return new IgniteBiTuple<>(key(binaryRow), value(binaryRow));
     }
 
-    protected static List<IgniteBiTuple<TestKey, TestValue>> toList(Cursor<BinaryRow> cursor) throws Exception {
-        try (cursor) {
-            return cursor.stream().map(BaseMvStoragesTest::unwrap).collect(Collectors.toList());
-        }
-    }
-
-    protected static List<IgniteBiTuple<TestKey, TestValue>> toList(PartitionTimestampCursor cursor) throws Exception {
+    protected static List<IgniteBiTuple<TestKey, TestValue>> toList(Cursor<ReadResult> cursor) throws Exception {
         try (cursor) {
             return cursor.stream().map(BaseMvStoragesTest::unwrap).collect(Collectors.toList());
         }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index ff64394254..24a18b3466 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -175,6 +175,18 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
         });
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
+        map.compute(rowId, (ignored, versionChain) -> {
+            if (versionChain != null && versionChain.isWriteIntent()) {
+                throw new StorageException("Write intent exists for " + rowId);
+            }
+
+            return new VersionChain(row, commitTimestamp, null, null, ReadResult.UNDEFINED_COMMIT_PARTITION_ID, versionChain);
+        });
+    }
+
     /** {@inheritDoc} */
     @Override
     public ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
@@ -211,9 +223,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
 
         if (timestamp == null) {
             // Search by transaction id.
-            BinaryRow binaryRow = versionChain.row;
 
-            if (filter != null && !filter.test(binaryRow)) {
+            if (filter != null && !filter.test(versionChain.row)) {
                 return ReadResult.EMPTY;
             }
 
@@ -221,24 +232,12 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                 throw new TxIdMismatchException(txId, versionChain.txId);
             }
 
-            boolean isWriteIntent = versionChain.ts == null;
-
-            if (isWriteIntent) {
-                return ReadResult.createFromWriteIntent(
-                        binaryRow,
-                        versionChain.txId,
-                        versionChain.commitTableId,
-                        versionChain.next != null ? versionChain.next.ts : null,
-                        versionChain.commitPartitionId
-                );
-            }
-
-            return ReadResult.createFromCommitted(binaryRow);
+            return versionChainToReadResult(versionChain, true);
         }
 
         VersionChain cur = versionChain;
 
-        if (cur.ts == null) {
+        if (cur.isWriteIntent()) {
             // We have a write-intent.
             if (cur.next == null) {
                 // We *only* have a write-intent, return it.
@@ -248,8 +247,8 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                     return ReadResult.EMPTY;
                 }
 
-                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, null,
-                        cur.commitPartitionId);
+                return ReadResult.createFromWriteIntent(binaryRow, cur.txId, cur.commitTableId, cur.commitPartitionId, null
+                );
             }
 
             // Move to first commit.
@@ -259,6 +258,19 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
         return walkVersionChain(versionChain, timestamp, filter, cur);
     }
 
+    private static ReadResult versionChainToReadResult(VersionChain versionChain, boolean fillLastCommittedTs) {
+        if (versionChain.isWriteIntent()) {
+            return ReadResult.createFromWriteIntent(
+                    versionChain.row,
+                    versionChain.txId,
+                    versionChain.commitTableId,
+                    versionChain.commitPartitionId, fillLastCommittedTs && versionChain.next != null ? versionChain.next.ts : null
+            );
+        }
+
+        return ReadResult.createFromCommitted(versionChain.row, versionChain.ts);
+    }
+
     /**
      * Walks version chain to find a row by timestamp. See {@link MvPartitionStorage#read(RowId, HybridTimestamp)} for details.
      *
@@ -281,8 +293,9 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                 return ReadResult.EMPTY;
             }
 
-            return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, firstCommit.ts,
-                    chainHead.commitPartitionId);
+            return ReadResult.createFromWriteIntent(binaryRow, chainHead.txId, chainHead.commitTableId, chainHead.commitPartitionId,
+                    firstCommit.ts
+            );
         }
 
         VersionChain cur = firstCommit;
@@ -298,7 +311,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                     return ReadResult.EMPTY;
                 }
 
-                return ReadResult.createFromCommitted(binaryRow);
+                return ReadResult.createFromCommitted(binaryRow, cur.ts);
             }
 
             cur = cur.next;
@@ -308,8 +321,12 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
     }
 
     @Override
-    public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
-        return Cursor.fromIterator(Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next).map(vc -> vc.row).iterator());
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
+        return Cursor.fromIterator(
+                Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
+                        .map((VersionChain versionChain) -> versionChainToReadResult(versionChain, false))
+                        .iterator()
+        );
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 0e8aba6174..f9da31bd26 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -141,8 +141,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
      * Starts a partition by initializing its internal structures.
      */
     public void start() {
-        try {
-            Cursor<IndexMeta> cursor = indexMetaTree.find(null, null);
+        try (Cursor<IndexMeta> cursor = indexMetaTree.find(null, null)) {
 
             NamedListView<TableIndexView> indexesCfgView = tablesConfiguration.indexes().value();
 
@@ -161,7 +160,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                     //TODO IGNITE-17626 Drop the index synchronously.
                 }
             }
-        } catch (IgniteInternalCheckedException e) {
+        } catch (Exception e) {
             throw new StorageException("Failed to process SQL indexes during the partition start", e);
         }
     }
@@ -393,7 +392,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                     row = new ByteBufferRow(rowVersion.value());
                 }
 
-                return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, null, commitPartitionId);
+                return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, null);
             }
         }
 
@@ -443,7 +442,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 row = new ByteBufferRow(rowVersion.value());
             }
 
-            return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, firstCommit.timestamp(), commitPartitionId);
+            return ReadResult.createFromWriteIntent(row, transactionId, commitTableId, commitPartitionId, firstCommit.timestamp());
         }
 
         RowVersion curCommit = firstCommit;
@@ -463,7 +462,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                     row = new ByteBufferRow(curCommit.value());
                 }
 
-                return ReadResult.createFromCommitted(row);
+                return ReadResult.createFromCommitted(row, curCommit.timestamp());
             }
 
             if (!curCommit.hasNextLink()) {
@@ -477,8 +476,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     private RowVersion insertRowVersion(@Nullable BinaryRow row, long nextPartitionlessLink) {
-        // TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
-        byte[] rowBytes = row == null ? TOMBSTONE_PAYLOAD : row.bytes();
+        byte[] rowBytes = rowBytes(row);
 
         RowVersion rowVersion = new RowVersion(partitionId, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
 
@@ -495,6 +493,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
+    private static byte[] rowBytes(@Nullable BinaryRow row) {
+        // TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+        return row == null ? TOMBSTONE_PAYLOAD : row.bytes();
+    }
+
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
@@ -627,7 +630,37 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     @Override
-    public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+    public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
+        assert rowId.partitionId() == partitionId : rowId;
+
+        VersionChain currentChain = findVersionChain(rowId);
+
+        if (currentChain != null && currentChain.isUncommitted()) {
+            // This means that there is a bug in our code as the caller must make sure that no write intent exists
+            // below this write.
+            throw new StorageException("Write intent exists for " + rowId);
+        }
+
+        long nextLink = currentChain == null ? NULL_LINK : currentChain.newestCommittedLink();
+        RowVersion newVersion = insertCommittedRowVersion(row, commitTimestamp, nextLink);
+
+        VersionChain chainReplacement = VersionChain.createCommitted(rowId, newVersion.link(), newVersion.nextLink());
+
+        updateVersionChain(chainReplacement);
+    }
+
+    private RowVersion insertCommittedRowVersion(BinaryRow row, HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
+        byte[] rowBytes = rowBytes(row);
+
+        RowVersion rowVersion = new RowVersion(partitionId, commitTimestamp, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+        insertRowVersion(rowVersion);
+
+        return rowVersion;
+    }
+
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
         try {
             VersionChain versionChain = versionChainTree.findOne(new VersionChainKey(rowId));
 
@@ -641,12 +674,30 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                     rowVersion.nextLink() == 0 ? null : readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE)
             );
 
-            return Cursor.fromIterator(stream.map(rowVersion -> new ByteBufferRow(rowVersion.value())).iterator());
+            return Cursor.fromIterator(
+                    stream.map(rowVersion -> rowVersionToResultNotFillingLastCommittedTs(versionChain, rowVersion))
+                            .iterator()
+            );
         } catch (IgniteInternalCheckedException e) {
             throw new RuntimeException(e);
         }
     }
 
+    private static ReadResult rowVersionToResultNotFillingLastCommittedTs(VersionChain versionChain, RowVersion rowVersion) {
+        ByteBufferRow row = new ByteBufferRow(rowVersion.value());
+
+        if (rowVersion.isCommitted()) {
+            return ReadResult.createFromCommitted(row, rowVersion.timestamp());
+        } else {
+            return ReadResult.createFromWriteIntent(
+                    row,
+                    versionChain.transactionId(),
+                    versionChain.commitTableId(),
+                    versionChain.commitPartitionId(), null
+            );
+        }
+    }
+
     /**
      * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
      * or already committed in a different transaction.
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 501e89f319..c51d8b888b 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -64,6 +64,13 @@ public final class RowVersion implements Storable {
         this(partitionId, 0, null, nextLink, value);
     }
 
+    /**
+     * Constructor.
+     */
+    public RowVersion(int partitionId, HybridTimestamp commitTimestamp, long nextLink, ByteBuffer value) {
+        this(partitionId, 0, commitTimestamp, nextLink, value);
+    }
+
     /**
      * Constructor.
      */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
index ea02198564..b1b2a399e3 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -21,6 +21,7 @@ import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
 
 import java.util.UUID;
 import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
@@ -62,7 +63,7 @@ public class VersionChain extends VersionChainKey {
     }
 
     public static VersionChain createCommitted(RowId rowId, long headLink, long nextLink) {
-        return new VersionChain(rowId, null, null, -1, headLink, nextLink);
+        return new VersionChain(rowId, null, null, ReadResult.UNDEFINED_COMMIT_PARTITION_ID, headLink, nextLink);
     }
 
     public static VersionChain createUncommitted(RowId rowId, UUID transactionId, UUID commitTableId, int commitPartitionId, long headLink,
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4352a306aa..aba18f52b1 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -318,7 +318,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     @Override
     public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
             throws TxIdMismatchException, StorageException {
-        WriteBatchWithIndex writeBatch = requireWriteBatch();
+        @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
 
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
@@ -373,10 +373,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      */
     private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
             throws RocksDBException {
-        WriteBatchWithIndex writeBatch = requireWriteBatch();
+        @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
 
-        //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
-        byte[] rowBytes = row.bytes();
+        byte[] rowBytes = rowBytes(row);
 
         ByteBuffer value = ByteBuffer.allocate(rowBytes.length + VALUE_HEADER_SIZE);
         byte[] array = value.array();
@@ -391,6 +390,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         writeBatch.put(cf, copyOf(keyArray, ROW_PREFIX_SIZE), value.array());
     }
 
+    private static byte[] rowBytes(BinaryRow row) {
+        //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+        return row.bytes();
+    }
+
     /** {@inheritDoc} */
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
@@ -447,6 +451,23 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         }
     }
 
+    @Override
+    public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
+        @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
+
+        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+        putTimestamp(keyBuf, commitTimestamp);
+
+        //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+        byte[] rowBytes = rowBytes(row);
+
+        try {
+            writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes);
+        } catch (RocksDBException e) {
+            throw new StorageException("Failed to update a row in storage", e);
+        }
+    }
+
     /**
      * Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
      *
@@ -519,7 +540,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 if (!isWriteIntent) {
                     // There is no write-intent, return latest committed row.
-                    return wrapCommittedValue(valueBytes);
+                    return wrapCommittedValue(valueBytes, readTimestamp(readKeyBuf));
                 }
 
                 assert valueBytes != null;
@@ -624,13 +645,13 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             // Should not be write-intent, as we were seeking with the timestamp.
             assert keyLength == MAX_KEY_SIZE;
 
-            HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf, ROW_PREFIX_SIZE);
+            HybridTimestamp rowTimestamp = readTimestamp(foundKeyBuf);
 
             byte[] valueBytes = seekIterator.value();
 
             if (rowTimestamp.equals(timestamp)) {
                 // This is exactly the row we are looking for.
-                return wrapCommittedValue(valueBytes);
+                return wrapCommittedValue(valueBytes, rowTimestamp);
             }
 
             // Let's check if there is more recent write. If it is a write-intent, then return write-intent.
@@ -639,7 +660,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             if (invalid(seekIterator)) {
                 // There is no more recent commits or write-intents.
-                return wrapCommittedValue(valueBytes);
+                return wrapCommittedValue(valueBytes, rowTimestamp);
             }
 
             foundKeyBuf.position(0).limit(MAX_KEY_SIZE);
@@ -647,7 +668,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             if (!matches(rowId, foundKeyBuf)) {
                 // There is no more recent commits or write-intents under this row id.
-                return wrapCommittedValue(valueBytes);
+                return wrapCommittedValue(valueBytes, rowTimestamp);
             }
 
             boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
@@ -656,7 +677,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 return wrapUncommittedValue(seekIterator.value(), rowTimestamp);
             }
 
-            return wrapCommittedValue(valueBytes);
+            return wrapCommittedValue(valueBytes, readTimestamp(foundKeyBuf));
         }
     }
 
@@ -675,7 +696,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     }
 
     @Override
-    public Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException {
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
@@ -692,8 +713,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         return new RocksIteratorAdapter<>(it) {
             @Override
-            protected BinaryRow decodeEntry(byte[] key, byte[] value) {
-                return wrapValueIntoBinaryRow(value, key.length == ROW_PREFIX_SIZE);
+            protected ReadResult decodeEntry(byte[] key, byte[] value) {
+                int keyLength = key.length;
+
+                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+                if (!isWriteIntent) {
+                    return wrapCommittedValue(value, readTimestamp(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER)));
+                }
+
+                assert value != null;
+
+                return wrapUncommittedValue(value, null);
             }
 
             @Override
@@ -1147,11 +1178,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         buf.putInt(~ts.getLogical());
     }
 
-    private static HybridTimestamp readTimestamp(ByteBuffer buf, int off) {
-        assert buf.order() == KEY_BYTE_ORDER;
+    private static HybridTimestamp readTimestamp(ByteBuffer keyBuf) {
+        assert keyBuf.order() == KEY_BYTE_ORDER;
 
-        long physical = ~buf.getLong(off);
-        int logical = ~buf.getInt(off + Long.BYTES);
+        long physical = ~keyBuf.getLong(ROW_PREFIX_SIZE);
+        int logical = ~keyBuf.getInt(ROW_PREFIX_SIZE + Long.BYTES);
 
         return new HybridTimestamp(physical, logical);
     }
@@ -1226,21 +1257,22 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             row = new ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(BINARY_ROW_BYTE_ORDER));
         }
 
-        return ReadResult.createFromWriteIntent(row, txId, commitTableId, newestCommitTs, commitPartitionId);
+        return ReadResult.createFromWriteIntent(row, txId, commitTableId, commitPartitionId, newestCommitTs);
     }
 
     /**
      * Converts raw byte array representation of the value into a read result.
      *
      * @param valueBytes Value bytes as read from the storage.
+     * @param rowCommitTimestamp Timestamp with which the row was committed.
      * @return Read result instance or {@code null} if value is a tombstone.
      */
-    private static ReadResult wrapCommittedValue(byte[] valueBytes) {
+    private static ReadResult wrapCommittedValue(byte[] valueBytes, HybridTimestamp rowCommitTimestamp) {
         if (isTombstone(valueBytes, false)) {
             return ReadResult.EMPTY;
         }
 
-        return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes));
+        return ReadResult.createFromCommitted(new ByteBufferRow(valueBytes), rowCommitTimestamp);
     }
 
     /**
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 2d9addd3b1..eb7675d0fb 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -78,4 +78,12 @@ public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTes
                 engine == null ? null : engine::stop
         );
     }
+
+    @Override
+    public void addWriteCommittedThrowsIfUncommittedVersionExists() {
+        // Disable this test because RocksDbMvPartitionStorage does not throw. It does not throw because this
+        // exception is thrown only to ease debugging as the caller must make sure that no write intent exists
+        // before calling addWriteCommitted(). For RocksDbMvPartitionStorage, it is not that cheap to check whether
+        // there is a write intent in the storage, so we do not require it to throw this optional exception.
+    }
 }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index d360289abd..3c9ca940e4 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import org.apache.ignite.hlc.HybridClock;
 import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -74,6 +75,8 @@ public class ItInternalTableScanTest {
     /** Internal table to test. */
     private InternalTable internalTbl;
 
+    private final HybridClock clock = new HybridClock();
+
     /**
      * Prepare test environment using DummyInternalTableImpl and Mocked storage.
      */
@@ -379,7 +382,7 @@ public class ItInternalTableScanTest {
             when(cursor.hasNext()).thenAnswer(hnInvocation -> cursorTouchCnt.get() < submittedItems.size());
 
             when(cursor.next()).thenAnswer(ninvocation ->
-                    ReadResult.createFromCommitted(submittedItems.get(cursorTouchCnt.getAndIncrement())));
+                    ReadResult.createFromCommitted(submittedItems.get(cursorTouchCnt.getAndIncrement()), clock.now()));
 
             return cursor;
         });