You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/26 16:55:48 UTC

[ignite-3] branch ignite-3.0.0-beta1 updated (3d5771c5ba -> c955bd8f82)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


    from 3d5771c5ba IGNITE-17260 IgniteTransactions and Transaction interfaces enriched with RO related methods (#1248)
     new ca20e3c149 IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE) (#1207)
     new c955bd8f82 IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone (#1252)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/storage/MvPartitionStorage.java       |   3 +-
 .../apache/ignite/internal/storage/ReadResult.java |  13 +-
 .../ignite/internal/storage/ReadResultTest.java}   |  16 +-
 .../storage/AbstractMvPartitionStorageTest.java    | 154 +++++-
 .../storage/impl/TestMvPartitionStorage.java       |   2 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 249 +++------
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 614 ++++++++++-----------
 7 files changed, 534 insertions(+), 517 deletions(-)
 copy modules/{raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/UnlimitedBudgetTest.java => storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java} (74%)


[ignite-3] 02/02: IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone (#1252)

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit c955bd8f823b6586e469869d6d8179d524362300
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Oct 26 19:30:58 2022 +0300

    IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone (#1252)
---
 .../internal/storage/MvPartitionStorage.java       |  3 +-
 .../apache/ignite/internal/storage/ReadResult.java |  3 +-
 .../storage/AbstractMvPartitionStorageTest.java    | 32 +++++++-------
 .../storage/impl/TestMvPartitionStorage.java       |  2 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 13 ++++--
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 49 +++++++++++++++++-----
 6 files changed, 72 insertions(+), 30 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 7f87994f03..52322783ae 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -170,7 +170,8 @@ public interface MvPartitionStorage extends AutoCloseable {
     Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
 
     /**
-     * Scans the partition and returns a cursor of values at the given timestamp.
+     * Scans the partition and returns a cursor of values at the given timestamp. This cursor filters out committed tombstones, but not
+     * tombstones in the write-intent state.
      *
      * @param timestamp Timestamp. Can't be {@code null}.
      * @return Cursor.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 0bd10c4adb..0b6a5f0dcb 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -130,7 +130,8 @@ public class ReadResult {
     }
 
     /**
-     * Returns timestamp of the most recent commit of the row. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
+     * Returns timestamp of the most recent commit of the row. Not {@code null} if committed version exists, this is a
+     * write-intent and read was made with a timestamp. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)}
      * even for write intents having a preceding committed version.
      *
      * @return Timestamp of the most recent commit of the row.
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 76c6db0b00..599265679f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1269,30 +1269,34 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
     @ParameterizedTest
     @EnumSource(ScanTimestampProvider.class)
-    public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
-        testScanDoesNotSeeTombstones(tsProvider, false);
+    public void scanSeesTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        HybridTimestamp commitTs = clock.now();
+        commitWrite(rowId, commitTs);
+
+        addWrite(rowId, null, newTransactionId());
+
+        try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
+            assertTrue(cursor.hasNext());
+
+            ReadResult next = cursor.next();
+            assertNull(next.binaryRow());
+            assertEquals(commitTs, next.newestCommitTimestamp());
+
+            assertFalse(cursor.hasNext());
+        }
     }
 
     @ParameterizedTest
     @EnumSource(ScanTimestampProvider.class)
     public void scanDoesNotSeeTombstonesWhenTombstoneIsCommitted(ScanTimestampProvider tsProvider) throws Exception {
-        testScanDoesNotSeeTombstones(tsProvider, true);
-    }
-
-    private void testScanDoesNotSeeTombstones(ScanTimestampProvider scantsProvider, boolean commitRemoval) throws Exception {
         RowId rowId = insert(binaryRow, txId);
         commitWrite(rowId, clock.now());
 
         addWrite(rowId, null, newTransactionId());
-        if (commitRemoval) {
-            commitWrite(rowId, clock.now());
-        }
-
-        assertScanSeesNothing(scantsProvider);
-    }
+        commitWrite(rowId, clock.now());
 
-    private void assertScanSeesNothing(ScanTimestampProvider scanTsProvider) throws Exception {
-        try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+        try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
             assertFalse(cursor.hasNext());
         }
     }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index f6043ae9a8..50e27bd297 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -351,7 +351,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
                     VersionChain chain = iterator.next();
                     ReadResult readResult = read(chain, timestamp, null);
 
-                    if (!readResult.isEmpty()) {
+                    if (!readResult.isEmpty() || readResult.isWriteIntent()) {
                         currentChain = chain;
                         currentReadResult = readResult;
 
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 3297c8cbc6..d05f62ed46 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -296,7 +296,14 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         if (versionChain.isUncommitted()) {
             assert versionChain.transactionId() != null;
 
-            return writeIntentToResult(versionChain, rowVersion, null);
+            HybridTimestamp newestCommitTs = null;
+
+            if (versionChain.hasCommittedVersions()) {
+                long newestCommitLink = versionChain.newestCommittedLink();
+                newestCommitTs = readRowVersion(newestCommitLink, ALWAYS_LOAD_VALUE).timestamp();
+            }
+
+            return writeIntentToResult(versionChain, rowVersion, newestCommitTs);
         } else {
             ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
@@ -798,7 +805,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 VersionChain chain = treeCursor.next();
                 ReadResult result = findRowVersionByTimestamp(chain, timestamp);
 
-                if (result.isEmpty()) {
+                if (result.isEmpty() && !result.isWriteIntent()) {
                     continue;
                 }
 
@@ -841,7 +848,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 VersionChain chain = treeCursor.next();
                 ReadResult result = findLatestRowVersion(chain);
 
-                if (result.isEmpty()) {
+                if (result.isEmpty() && !result.isWriteIntent()) {
                     continue;
                 }
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 1a598edd3b..06862c8091 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1115,9 +1115,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
             currentRowId = null;
 
             // Prepare direct buffer slice to read keys from the iterator.
-            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+            ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().position(0);
 
             while (true) {
+                currentKeyBuffer.position(0);
+
                 // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
                 // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
                 // in this code. Now let's describe what we'll find, assuming that iterator found something:
@@ -1140,15 +1142,17 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 }
 
                 // Read the actual key into a direct buffer.
-                int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
+                int keyLength = it.key(currentKeyBuffer.limit(MAX_KEY_SIZE));
 
                 boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                directBuffer.limit(ROW_PREFIX_SIZE);
+                currentKeyBuffer.limit(ROW_PREFIX_SIZE);
+
+                RowId rowId = getRowId(currentKeyBuffer);
 
                 // Copy actual row id into a "seekKeyBuf" buffer.
-                seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
-                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
+                seekKeyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
+                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
 
                 // This one might look tricky. We finished processing next row. There are three options:
                 //  - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
@@ -1168,12 +1172,37 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 // Cache row and return "true" if it's found and not a tombstone.
                 byte[] valueBytes = it.value();
 
-                directBuffer.limit(keyLength);
-                ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes);
+                HybridTimestamp nextCommitTimestamp = null;
+
+                if (isWriteIntent) {
+                    it.next();
+
+                    if (!invalid(it)) {
+                        ByteBuffer key = ByteBuffer.wrap(it.key()).order(KEY_BYTE_ORDER);
+
+                        if (matches(rowId, key)) {
+                            // This is a next version of current row.
+                            nextCommitTimestamp = readTimestamp(key);
+                        }
+                    }
+                }
+
+                currentKeyBuffer.limit(keyLength);
+
+                assert valueBytes != null;
+
+                ReadResult readResult;
+
+                if (!isWriteIntent) {
+                    // There is no write-intent, return latest committed row.
+                    readResult = wrapCommittedValue(valueBytes, readTimestamp(currentKeyBuffer));
+                } else {
+                    readResult = wrapUncommittedValue(valueBytes, nextCommitTimestamp);
+                }
 
-                if (!readResult.isEmpty()) {
+                if (!readResult.isEmpty() || readResult.isWriteIntent()) {
                     next = readResult;
-                    currentRowId = getRowId(directBuffer);
+                    currentRowId = rowId;
 
                     return true;
                 }
@@ -1224,7 +1253,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
 
-                if (readResult.isEmpty()) {
+                if (readResult.isEmpty() && !readResult.isWriteIntent()) {
                     // Seek to next row id as we found nothing that matches.
                     incrementRowId(seekKeyBuf);
 


[ignite-3] 01/02: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE) (#1207)

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit ca20e3c14982e36779ab9a929ca6d6ddb774ac36
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Oct 17 18:07:50 2022 +0400

    IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE) (#1207)
---
 .../apache/ignite/internal/storage/ReadResult.java |  10 +-
 .../ignite/internal/storage/ReadResultTest.java    |  36 ++
 .../storage/AbstractMvPartitionStorageTest.java    | 150 +++++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 244 +++------
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 585 ++++++++++-----------
 5 files changed, 516 insertions(+), 509 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 735f7dfb11..0bd10c4adb 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -32,7 +32,7 @@ public class ReadResult {
     /** Empty read result. */
     public static final ReadResult EMPTY = new ReadResult(null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
 
-    /** Data. */
+    /** Data. {@code null} iff the result is empty (i.e. no row exists or it is a tombstone). */
     private final @Nullable BinaryRow binaryRow;
 
     /** Transaction id. Not {@code null} iff this is a write-intent. */
@@ -90,11 +90,11 @@ public class ReadResult {
     }
 
     /**
-     * Returns binary row representation of the data.
+     * Returns binary row representation of the data, {@code null} if {@link #isEmpty()}.
      *
-     * @return Binary row representation of the data.
+     * @return Binary row representation of the data, {@code null} if {@link #isEmpty()}.
      */
-    public BinaryRow binaryRow() {
+    public @Nullable BinaryRow binaryRow() {
         return binaryRow;
     }
 
@@ -155,6 +155,6 @@ public class ReadResult {
     }
 
     public boolean isEmpty() {
-        return this == EMPTY;
+        return binaryRow == null;
     }
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
new file mode 100644
index 0000000000..9310392568
--- /dev/null
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class ReadResultTest {
+    @Test
+    void resultInEmptyConstantIsEmpty() {
+        assertTrue(ReadResult.EMPTY.isEmpty());
+    }
+
+    @Test
+    void resultWithNullRowIsEmpty() {
+        ReadResult result = ReadResult.createFromCommitted(null, null);
+
+        assertTrue(result.isEmpty());
+    }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 98e25e1f2f..76c6db0b00 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -48,6 +48,8 @@ import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * Base test for MV partition storages.
@@ -155,9 +157,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         assertNull(read(rowId, clock.now()));
     }
 
-    @Test
-    public void testScanOverEmpty() throws Exception {
-        assertEquals(List.of(), convert(scan(clock.now())));
+    @ParameterizedTest
+    @EnumSource
+    public void testScanOverEmpty(ScanTimestampProvider tsProvider) throws Exception {
+        assertEquals(List.of(), convert(scan(tsProvider.scanTimestamp(clock))));
     }
 
     /**
@@ -302,8 +305,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         assertEquals(List.of(value1, value2), convert(scan(ts4)));
         assertEquals(List.of(value1, value2), convert(scan(ts5)));
+
+        assertEquals(List.of(value1, value2), convert(scan(HybridTimestamp.MAX_VALUE)));
     }
 
+    @SuppressWarnings("ConstantConditions")
     @Test
     public void testTransactionScanCursorInvariants() throws Exception {
         TestValue value1 = new TestValue(10, "xxx");
@@ -338,6 +344,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    @SuppressWarnings("ConstantConditions")
     @Test
     public void testTimestampScanCursorInvariants() throws Exception {
         TestValue value11 = new TestValue(10, "xxx");
@@ -409,6 +416,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
             assertFalse(cursor.hasNext());
             assertFalse(cursor.hasNext());
 
+            assertThrows(NoSuchElementException.class, () -> cursor.next());
+
             assertThrows(IllegalStateException.class, () -> cursor.committed(commitTs1));
         }
     }
@@ -827,11 +836,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         });
     }
 
-    @Test
-    void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    void scanWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite(ScanTimestampProvider tsProvider) throws Exception {
         commitAbortAndAddUncommitted();
 
-        try (Cursor<ReadResult> cursor = storage.scan(clock.now())) {
+        try (Cursor<ReadResult> cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
             BinaryRow foundRow = cursor.next().binaryRow();
 
             assertRowMatches(foundRow, binaryRow3);
@@ -1086,23 +1096,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
-    @Test
-    void testScanWithWriteIntent() throws Exception {
-        RowId rowId1 = new RowId(PARTITION_ID);
-
-        HybridTimestamp commit1ts = clock.now();
-
-        storage.runConsistently(() -> {
-            addWrite(rowId1, binaryRow, newTransactionId());
-
-            commitWrite(rowId1, commit1ts);
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    void testScanWithWriteIntent(ScanTimestampProvider tsProvider) throws Exception {
+        HybridTimestamp commitTs = addCommittedVersionAndWriteIntent();
 
-            addWrite(rowId1, binaryRow2, newTransactionId());
-
-            return null;
-        });
-
-        try (PartitionTimestampCursor cursor = storage.scan(clock.now())) {
+        try (PartitionTimestampCursor cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
             assertTrue(cursor.hasNext());
 
             ReadResult next = cursor.next();
@@ -1111,12 +1110,30 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
             assertRowMatches(next.binaryRow(), binaryRow2);
 
-            BinaryRow committedRow = cursor.committed(next.newestCommitTimestamp());
+            BinaryRow committedRow = cursor.committed(commitTs);
 
             assertRowMatches(committedRow, binaryRow);
         }
     }
 
+    private HybridTimestamp addCommittedVersionAndWriteIntent() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        HybridTimestamp commitTs = clock.now();
+
+        storage.runConsistently(() -> {
+            addWrite(rowId, binaryRow, newTransactionId());
+
+            commitWrite(rowId, commitTs);
+
+            addWrite(rowId, binaryRow2, newTransactionId());
+
+            return null;
+        });
+
+        return commitTs;
+    }
+
     @Test
     void testScanVersionsWithWriteIntent() throws Exception {
         RowId rowId = new RowId(PARTITION_ID, 100, 0);
@@ -1141,8 +1158,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         assertEquals(2, list.size());
 
-        for (int i = 0; i < list.size(); i++) {
-            assertEquals(key, list.get(i).getKey());
+        for (IgniteBiTuple<TestKey, TestValue> objects : list) {
+            assertEquals(key, objects.getKey());
         }
 
         assertEquals(value2, list.get(0).getValue());
@@ -1235,6 +1252,72 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    public void scanCursorHasNextReturnsFalseEachTimeAfterExhaustion(ScanTimestampProvider tsProvider) throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        commitWrite(rowId, clock.now());
+
+        try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
+            cursor.next();
+
+            assertFalse(cursor.hasNext());
+            //noinspection ConstantConditions
+            assertFalse(cursor.hasNext());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
+        testScanDoesNotSeeTombstones(tsProvider, false);
+    }
+
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    public void scanDoesNotSeeTombstonesWhenTombstoneIsCommitted(ScanTimestampProvider tsProvider) throws Exception {
+        testScanDoesNotSeeTombstones(tsProvider, true);
+    }
+
+    private void testScanDoesNotSeeTombstones(ScanTimestampProvider scantsProvider, boolean commitRemoval) throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        commitWrite(rowId, clock.now());
+
+        addWrite(rowId, null, newTransactionId());
+        if (commitRemoval) {
+            commitWrite(rowId, clock.now());
+        }
+
+        assertScanSeesNothing(scantsProvider);
+    }
+
+    private void assertScanSeesNothing(ScanTimestampProvider scanTsProvider) throws Exception {
+        try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+            assertFalse(cursor.hasNext());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    void committedMethodCallDoesNotInterfereWithIteratingOverScanCursor(ScanTimestampProvider scanTsProvider) throws Exception {
+        RowId rowId1 = insert(binaryRow, txId);
+        HybridTimestamp commitTs1 = clock.now();
+        commitWrite(rowId1, commitTs1);
+
+        insert(binaryRow2, txId);
+
+        try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+            cursor.next();
+
+            cursor.committed(commitTs1);
+
+            ReadResult result2 = cursor.next();
+            assertRowMatches(result2.binaryRow(), binaryRow2);
+
+            assertFalse(cursor.hasNext());
+        }
+    }
+
     /**
      * Returns row id that is lexicographically smaller (by the value of one) than the argument.
      *
@@ -1253,4 +1336,21 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         return new RowId(value.partitionId(), msb, lsb);
     }
+
+    private enum ScanTimestampProvider {
+        NOW {
+            @Override
+            HybridTimestamp scanTimestamp(HybridClock clock) {
+                return clock.now();
+            }
+        },
+        MAX_VALUE {
+            @Override
+            HybridTimestamp scanTimestamp(HybridClock clock) {
+                return HybridTimestamp.MAX_VALUE;
+            }
+        };
+
+        abstract HybridTimestamp scanTimestamp(HybridClock clock);
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 60dccdc65e..3297c8cbc6 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -73,8 +73,6 @@ import org.jetbrains.annotations.Nullable;
 public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitionStorage {
     private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
 
-    private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
-
     private static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp -> true;
 
     protected final int partitionId;
@@ -260,32 +258,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    /**
-     * Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
-     *
-     * @param rowId Row id.
-     * @param txId Transaction id.
-     * @return Read result that corresponds to the key or {@code null} if value is not found.
-     * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
-     * @throws StorageException If failed to read data from the storage.
-     */
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
-        if (rowId.partitionId() != partitionId) {
-            throw new IllegalArgumentException(
-                    String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
-        }
-
-        VersionChain versionChain = findVersionChain(rowId);
-
-        if (versionChain == null) {
-            return null;
-        }
-
-        return findLatestRowVersion(versionChain, txId, MATCH_ALL);
-    }
-
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
         if (rowId.partitionId() != partitionId) {
@@ -299,7 +271,15 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             return ReadResult.EMPTY;
         }
 
-        return findRowVersionByTimestamp(versionChain, timestamp);
+        if (lookingForLatestVersion(timestamp)) {
+            return findLatestRowVersion(versionChain);
+        } else {
+            return findRowVersionByTimestamp(versionChain, timestamp);
+        }
+    }
+
+    private boolean lookingForLatestVersion(HybridTimestamp timestamp) {
+        return timestamp == HybridTimestamp.MAX_VALUE;
     }
 
     private @Nullable VersionChain findVersionChain(RowId rowId) {
@@ -310,26 +290,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    private @Nullable BinaryRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+    private ReadResult findLatestRowVersion(VersionChain versionChain) {
         RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
 
-        ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
-
-        if (keyFilter != null && !keyFilter.test(row)) {
-            return null;
-        }
-
         if (versionChain.isUncommitted()) {
-            UUID chainTxId = versionChain.transactionId();
+            assert versionChain.transactionId() != null;
 
-            assert chainTxId != null;
-
-            throwIfChainBelongsToAnotherTx(versionChain, txId);
+            return writeIntentToResult(versionChain, rowVersion, null);
+        } else {
+            ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
-            return row;
+            return ReadResult.createFromCommitted(row, rowVersion.timestamp());
         }
-
-        return row;
     }
 
     private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -682,24 +654,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    /**
-     * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
-     * or already committed in a different transaction.
-     *
-     * @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
-     * @param txId Transaction id.
-     * @return Cursor.
-     * @throws StorageException If failed to read data from the storage.
-     */
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
-        return internalScan(keyFilter, txId);
-    }
-
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
-        assert timestamp != null;
+        Objects.requireNonNull(timestamp, "timestamp is null");
 
         Cursor<VersionChain> treeCursor;
 
@@ -709,21 +666,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             throw new StorageException("Find failed", e);
         }
 
-        return new TimestampCursor(treeCursor, timestamp);
-    }
-
-    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) {
-        assert txId != null;
-
-        Cursor<VersionChain> treeCursor;
-
-        try {
-            treeCursor = versionChainTree.find(null, null);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Find failed", e);
+        if (lookingForLatestVersion(timestamp)) {
+            return new LatestVersionsCursor(treeCursor);
+        } else {
+            return new TimestampCursor(treeCursor, timestamp);
         }
-
-        return new TransactionIdCursor(treeCursor, keyFilter, txId);
     }
 
     @Override
@@ -765,25 +712,67 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         // TODO: IGNITE-17132 Implement it
     }
 
+    private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
+        protected final Cursor<VersionChain> treeCursor;
+
+        @Nullable
+        protected ReadResult nextRead = null;
+
+        @Nullable
+        protected VersionChain currentChain = null;
+
+        protected BasePartitionTimestampCursor(Cursor<VersionChain> treeCursor) {
+            this.treeCursor = treeCursor;
+        }
+
+        @Override
+        public final ReadResult next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("The cursor is exhausted");
+            }
+
+            assert nextRead != null;
+
+            ReadResult res = nextRead;
+
+            nextRead = null;
+
+            return res;
+        }
+
+        @Override
+        public void close() throws Exception {
+            treeCursor.close();
+        }
+
+        @Override
+        public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+            if (currentChain == null) {
+                throw new IllegalStateException();
+            }
+
+            ReadResult result = findRowVersionByTimestamp(currentChain, timestamp);
+            if (result.isEmpty()) {
+                return null;
+            }
+
+            // We don't check if row conforms the key filter here, because we've already checked it.
+            return result.binaryRow();
+        }
+    }
+
     /**
      * Implementation of the {@link PartitionTimestampCursor} over the page memory storage.
      * See {@link PartitionTimestampCursor} for the details on the API.
      */
-    private class TimestampCursor implements PartitionTimestampCursor {
-        private final Cursor<VersionChain> treeCursor;
-
+    private class TimestampCursor extends BasePartitionTimestampCursor {
         private final HybridTimestamp timestamp;
 
-        @Nullable
-        private ReadResult nextRead = null;
-
-        @Nullable
-        private VersionChain currentChain = null;
-
         private boolean iterationExhausted = false;
 
         public TimestampCursor(Cursor<VersionChain> treeCursor, HybridTimestamp timestamp) {
-            this.treeCursor = treeCursor;
+            super(treeCursor);
+
             this.timestamp = timestamp;
         }
 
@@ -807,50 +796,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 }
 
                 VersionChain chain = treeCursor.next();
-                ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+                ReadResult result = findRowVersionByTimestamp(chain, timestamp);
 
-                if (res.isEmpty()) {
+                if (result.isEmpty()) {
                     continue;
                 }
 
-                nextRead = res;
+                nextRead = result;
                 currentChain = chain;
 
                 return true;
             }
         }
-
-        @Override
-        public ReadResult next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException("The cursor is exhausted");
-            }
-
-            assert nextRead != null;
-
-            ReadResult res = nextRead;
-
-            nextRead = null;
-
-            return res;
-        }
-
-        @Override
-        public void close() {
-            // No-op.
-        }
-
-        @Override
-        public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
-            if (currentChain == null) {
-                throw new IllegalStateException();
-            }
-
-            ReadResult res = findRowVersionByTimestamp(currentChain, timestamp);
-
-            // We don't check if row conforms the key filter here, because we've already checked it.
-            return res.binaryRow();
-        }
     }
 
     /**
@@ -858,30 +815,16 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
      * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
      * or already committed in a different transaction.
      */
-    private class TransactionIdCursor implements Cursor<BinaryRow> {
-        private final Cursor<VersionChain> treeCursor;
-
-        private final Predicate<BinaryRow> keyFilter;
-
-        private final @Nullable UUID transactionId;
-
-        private BinaryRow nextRow = null;
-
+    private class LatestVersionsCursor extends BasePartitionTimestampCursor {
         private boolean iterationExhausted = false;
 
-        public TransactionIdCursor(
-                Cursor<VersionChain> treeCursor,
-                Predicate<BinaryRow> keyFilter,
-                @Nullable UUID transactionId
-        ) {
-            this.treeCursor = treeCursor;
-            this.keyFilter = keyFilter;
-            this.transactionId = transactionId;
+        public LatestVersionsCursor(Cursor<VersionChain> treeCursor) {
+            super(treeCursor);
         }
 
         @Override
         public boolean hasNext() {
-            if (nextRow != null) {
+            if (nextRead != null) {
                 return true;
             }
 
@@ -896,32 +839,17 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 }
 
                 VersionChain chain = treeCursor.next();
-                BinaryRow row = findLatestRowVersion(chain, transactionId, keyFilter);
+                ReadResult result = findLatestRowVersion(chain);
 
-                if (row != null) {
-                    nextRow = row;
-                    return true;
+                if (result.isEmpty()) {
+                    continue;
                 }
-            }
-        }
 
-        @Override
-        public BinaryRow next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException("The cursor is exhausted");
-            }
-
-            assert nextRow != null;
-
-            BinaryRow row = nextRow;
-            nextRow = null;
-
-            return row;
-        }
+                nextRead = result;
+                currentChain = chain;
 
-        @Override
-        public void close() {
-            // No-op.
+                return true;
+            }
         }
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 85375c72ac..1a598edd3b 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -31,10 +31,10 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
-import java.util.function.Predicate;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -468,31 +468,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         }
     }
 
-    /**
-     * Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
-     *
-     * @param rowId Row id.
-     * @param txId Transaction id.
-     * @return Read result that corresponds to the key or {@code null} if value is not found.
-     * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
-     * @throws StorageException If failed to read data from the storage.
-     */
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
-        return read(rowId, null, txId).binaryRow();
-    }
-
     /** {@inheritDoc} */
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
-        return read(rowId, timestamp, null);
-    }
-
-    private ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
-            throws TxIdMismatchException, StorageException {
-        assert timestamp == null ^ txId == null;
-
         if (rowId.partitionId() != partitionId) {
             throw new IllegalArgumentException(
                     String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
@@ -503,55 +481,65 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         try (
                 // Set next partition as an upper bound.
-                var readOpts = new ReadOptions().setIterateUpperBound(upperBound);
-                RocksIterator baseIterator = db.newIterator(cf, readOpts);
+                var readOpts1 = new ReadOptions().setIterateUpperBound(upperBound);
+                RocksIterator baseIterator = db.newIterator(cf, readOpts1);
                 // "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
                 // It's not documented, but this is exactly how it should be used.
                 RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
                         ? writeBatch.newIteratorWithBase(cf, baseIterator)
                         : baseIterator
         ) {
-            if (timestamp == null) {
-                ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            if (lookingForLatestVersions(timestamp)) {
+                return readLatestVersion(rowId, seekIterator);
+            } else {
+                return readByTimestamp(seekIterator, rowId, timestamp);
+            }
+        }
+    }
 
-                // Seek to the first appearance of row id if timestamp isn't set.
-                // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
-                // Unfortunately, copy here is unavoidable with current API.
-                assert keyBuf.position() == ROW_PREFIX_SIZE;
-                seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+    private boolean lookingForLatestVersions(HybridTimestamp timestamp) {
+        return timestamp == HybridTimestamp.MAX_VALUE;
+    }
 
-                if (invalid(seekIterator)) {
-                    // No data at all.
-                    return ReadResult.EMPTY;
-                }
+    private ReadResult readLatestVersion(RowId rowId, RocksIterator seekIterator) {
+        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
+        // Seek to the first appearance of row id if timestamp isn't set.
+        // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
+        // Unfortunately, copy here is unavoidable with current API.
+        assert keyBuf.position() == ROW_PREFIX_SIZE;
+        seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
 
-                ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+        if (invalid(seekIterator)) {
+            // No data at all.
+            return ReadResult.EMPTY;
+        }
 
-                int keyLength = seekIterator.key(readKeyBuf);
+        ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
 
-                if (!matches(rowId, readKeyBuf)) {
-                    // Wrong row id.
-                    return ReadResult.EMPTY;
-                }
+        int keyLength = seekIterator.key(readKeyBuf);
 
-                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+        if (!matches(rowId, readKeyBuf)) {
+            // It is already a different row, so no version exists for our rowId.
+            return ReadResult.EMPTY;
+        }
 
-                byte[] valueBytes = seekIterator.value();
+        boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                if (!isWriteIntent) {
-                    // There is no write-intent, return latest committed row.
-                    return wrapCommittedValue(valueBytes, readTimestamp(readKeyBuf));
-                }
+        byte[] valueBytes = seekIterator.value();
 
-                assert valueBytes != null;
+        return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, valueBytes);
+    }
 
-                validateTxId(valueBytes, txId);
+    private static ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) {
+        assert valueBytes != null;
 
-                return wrapUncommittedValue(valueBytes, null);
-            } else {
-                return readByTimestamp(seekIterator, rowId, timestamp);
-            }
+        if (!isWriteIntent) {
+            // There is no write-intent, return latest committed row.
+            return wrapCommittedValue(valueBytes, readTimestamp(keyBuf));
         }
+
+        return wrapUncommittedValue(valueBytes, null);
     }
 
     /**
@@ -562,7 +550,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * @param timestamp Timestamp.
      * @return Read result.
      */
-    private @Nullable ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
+    private ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         // Put timestamp restriction according to N2O timestamps order.
@@ -685,14 +673,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * Checks if row id matches the one written in the key buffer. Note: this operation changes the position in the buffer.
      *
      * @param rowId Row id.
-     * @param keyByf Key buffer.
+     * @param keyBuf Key buffer.
      * @return {@code true} if row id matches the key buffer, {@code false} otherwise.
      */
-    private static boolean matches(RowId rowId, ByteBuffer keyByf) {
+    private static boolean matches(RowId rowId, ByteBuffer keyBuf) {
         // Comparison starts from the position of the row id.
-        keyByf.position(ROW_ID_OFFSET);
+        keyBuf.position(ROW_ID_OFFSET);
 
-        return rowId.mostSignificantBits() == normalize(keyByf.getLong()) && rowId.leastSignificantBits() == normalize(keyByf.getLong());
+        return rowId.mostSignificantBits() == normalize(keyBuf.getLong()) && rowId.leastSignificantBits() == normalize(keyBuf.getLong());
     }
 
     @Override
@@ -718,13 +706,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                if (!isWriteIntent) {
-                    return wrapCommittedValue(value, readTimestamp(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER)));
-                }
-
-                assert value != null;
-
-                return wrapUncommittedValue(value, null);
+                return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
             }
 
             @Override
@@ -736,264 +718,28 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         };
     }
 
-    /**
-     * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
-     * or already committed in a different transaction.
-     *
-     * @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
-     * @param txId Transaction id.
-     * @return Cursor.
-     * @throws StorageException If failed to read data from the storage.
-     */
     // TODO: IGNITE-16914 Play with prefix settings and benchmark results.
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
-        assert txId != null;
-
-        RocksIterator it = db.newIterator(cf, scanReadOptions);
-
-        // Seek iterator to the beginning of the partition.
-        it.seek(partitionStartPrefix());
-
-        // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
-        // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
-        // partition.
-        // Byte buffer from a thread-local field can't be used here, because of two reasons:
-        //  - no one guarantees that there will only be a single cursor;
-        //  - no one guarantees that returned cursor will not be used by other threads.
-        // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
-        ByteBuffer seekKeyBuf = ByteBuffer.allocate(ROW_PREFIX_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
-
-        return new Cursor<>() {
-            /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
-            private BinaryRow next;
-
-            /** {@inheritDoc} */
-            @Override
-            public boolean hasNext() {
-                // Fast-path for consecutive invocations.
-                if (next != null) {
-                    return true;
-                }
-
-                // Prepare direct buffer slice to read keys from the iterator.
-                ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
-
-                while (true) {
-                    // We should do after each seek. Here in particular it means one of two things:
-                    //  - partition is empty;
-                    //  - iterator exhausted all the data in partition.
-                    if (invalid(it)) {
-                        return false;
-                    }
-
-                    // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
-                    // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
-                    // in this code. Now let's describe what we'll find, assuming that iterator found something:
-                    //  - if timestamp is null:
-                    //      - this seek will find the newest version of the next row in iterator. Exactly what we need.
-                    //  - if timestamp is not null:
-                    //      - suppose that seek key buffer has the following value: "| P0 | R0 | T0 |" (partition, row id, timestamp)
-                    //        and iterator finds something, let's denote it as "| P0 | R1 | T1 |" (partition must match). Again, there are
-                    //        few possibilities here:
-                    //          - R1 == R0, this means a match. By the rules of ordering we derive that T1 >= T0. Timestamps are stored in
-                    //            descending order, this means that we found exactly what's needed.
-                    //          - R1 > R0, this means that we found next row and T1 is either missing (pending row) or represents the latest
-                    //            version of the row. It doesn't matter in this case, because this row id will be reused to find its value
-                    //            at time T0. Additional "seek" will be required to do it.
-                    it.seek(seekKeyBuf.array());
-
-                    // Finish scan if nothing was found.
-                    if (invalid(it)) {
-                        return false;
-                    }
-
-                    // Read the actual key into a direct buffer.
-                    int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
-
-                    boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
-
-                    directBuffer.limit(ROW_PREFIX_SIZE);
-
-                    // Copy actual row id into a "seekKeyBuf" buffer.
-                    seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
-                    seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
-
-                    // This one might look tricky. We finished processing next row. There are three options:
-                    //  - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
-                    //  - value is empty, we found a tombstone. We'll continue to next iteration as well;
-                    //  - value is not empty and everything's good. We'll cache it and return from method.
-                    // In all three cases we need to prepare the value of "seekKeyBuf" so that it has not-yet-scanned row id in it.
-                    // the only valid way to do so is to treat row id payload as one big unsigned integer in Big Endian and increment it.
-                    // It's important to note that increment may overflow. In this case "carry flag" will go into incrementing partition id.
-                    // This is fine for three reasons:
-                    //  - iterator has an upper bound, following "seek" will result in invalid iterator state.
-                    //  - partition id itself cannot be overflown, because it's limited with a constant less than 0xFFFF.
-                    // It's something like 65500, I think.
-                    //  - "seekKeyBuf" buffer value will not be used after that, so it's ok if we corrupt its data (in every other instance,
-                    //    buffer starts with a valid partition id, which is set during buffer's initialization).
-                    incrementRowId(seekKeyBuf);
-
-                    // Cache row and return "true" if it's found and not a tombstone.
-                    byte[] valueBytes = it.value();
-
-                    BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, isWriteIntent);
-
-                    if (binaryRow != null && (keyFilter == null || keyFilter.test(binaryRow))) {
-                        if (isWriteIntent) {
-                            validateTxId(valueBytes, txId);
-                        }
-
-                        next = binaryRow;
-
-                        return true;
-                    }
-                }
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public BinaryRow next() {
-                if (!hasNext()) {
-                    throw new NoSuchElementException();
-                }
-
-                BinaryRow res = next;
-
-                next = null;
-
-                return res;
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public void close() throws Exception {
-                IgniteUtils.closeAll(it);
-            }
-        };
-    }
-
     /** {@inheritDoc} */
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
-        assert timestamp != null;
-
-        RocksIterator it = db.newIterator(cf, scanReadOptions);
-
-        // You can see the motivation behind the usage of a separate (not thread-local) buffer in the transaction id
-        // cursor code.
-        ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
-
-        return new PartitionTimestampCursor() {
-
-            private RowId currentRowId;
-
-            /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
-            private ReadResult next;
-
-            private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, HybridTimestamp timestamp) {
-                keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
-                keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
-
-                putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
-
-                keyBuf.position(0);
-            }
-
-            @Override
-            public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
-                if (currentRowId == null) {
-                    throw new IllegalStateException();
-                }
-
-                setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
-
-                it.seek(seekKeyBuf.array());
-
-                ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
-
-                if (readResult.isEmpty()) {
-                    return null;
-                }
-
-                // We don't check if row conforms the key filter here, because we've already checked it.
-                return readResult.binaryRow();
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public boolean hasNext() {
-                // Fast-path for consecutive invocations.
-                if (next != null) {
-                    return true;
-                }
-
-                if (currentRowId != null) {
-                    setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
-                    incrementRowId(seekKeyBuf);
-                }
-
-                currentRowId = null;
-
-                // Prepare direct buffer slice to read keys from the iterator.
-                ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
-
-                while (true) {
-                    it.seek(seekKeyBuf.array());
-
-                    if (invalid(it)) {
-                        return false;
-                    }
+        Objects.requireNonNull(timestamp, "timestamp is null");
 
-                    // We need to figure out what current row id is.
-                    it.key(directBuffer.position(0));
-
-                    directBuffer.position(ROW_ID_OFFSET);
-
-                    RowId rowId = getRowId(directBuffer);
-
-                    setKeyBuffer(seekKeyBuf, rowId, timestamp);
-
-                    // Seek to current row id + timestamp.
-                    it.seek(seekKeyBuf.array());
-
-                    ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
-
-                    if (readResult.isEmpty()) {
-                        // Seek to next row id as we found nothing that matches.
-                        incrementRowId(seekKeyBuf);
-
-                        continue;
-                    }
-
-                    next = readResult;
-                    currentRowId = rowId;
-
-                    return true;
-                }
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public ReadResult next() {
-                if (!hasNext()) {
-                    throw new NoSuchElementException();
-                }
-
-                ReadResult res = next;
+        if (lookingForLatestVersions(timestamp)) {
+            return new ScanLatestVersionsCursor();
+        } else {
+            return new ScanByTimestampCursor(timestamp);
+        }
+    }
 
-                next = null;
+    private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable HybridTimestamp timestamp) {
+        keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
+        keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
 
-                return res;
-            }
+        if (timestamp != null) {
+            putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
+        }
 
-            /** {@inheritDoc} */
-            @Override
-            public void close() throws Exception {
-                IgniteUtils.closeAll(it);
-            }
-        };
+        keyBuf.position(0);
     }
 
     @Override
@@ -1013,8 +759,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             it.key(readKeyBuf);
 
-            readKeyBuf.position(ROW_ID_OFFSET);
-
             return getRowId(readKeyBuf);
         } finally {
             keyBuf.limit(MAX_KEY_SIZE);
@@ -1047,8 +791,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         buf.position(0);
     }
 
-    private RowId getRowId(ByteBuffer readKeyBuf) {
-        return new RowId(partitionId, normalize(readKeyBuf.getLong()), normalize(readKeyBuf.getLong()));
+    private RowId getRowId(ByteBuffer buffer) {
+        buffer.position(ROW_ID_OFFSET);
+
+        return new RowId(partitionId, normalize(buffer.getLong()), normalize(buffer.getLong()));
     }
 
     @Override
@@ -1088,7 +834,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 boolean valueHasTxId = keyBytes.length == ROW_PREFIX_SIZE;
 
                 if (!isTombstone(valueBytes, valueHasTxId)) {
-                    ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER).position(ROW_ID_OFFSET);
+                    ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER);
 
                     RowId rowId = getRowId(keyBuf);
 
@@ -1295,4 +1041,201 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     private static boolean isTombstone(byte[] valueBytes, boolean hasTxId) {
         return valueBytes.length == (hasTxId ? VALUE_HEADER_SIZE : 0);
     }
+
+    private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
+        protected final RocksIterator it = db.newIterator(cf, scanReadOptions);
+
+        // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
+        // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
+        // partition.
+        // Byte buffer from a thread-local field can't be used here, because of two reasons:
+        //  - no one guarantees that there will only be a single cursor;
+        //  - no one guarantees that returned cursor will not be used by other threads.
+        // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
+        protected final ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
+
+        protected RowId currentRowId;
+
+        /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
+        protected ReadResult next;
+
+        @Override
+        public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+            Objects.requireNonNull(timestamp, "timestamp is null");
+
+            if (currentRowId == null) {
+                throw new IllegalStateException("currentRowId is null");
+            }
+
+            setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+
+            it.seek(seekKeyBuf.array());
+
+            ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
+
+            if (readResult.isEmpty()) {
+                return null;
+            }
+
+            return readResult.binaryRow();
+        }
+
+        @Override
+        public final ReadResult next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            ReadResult res = next;
+
+            next = null;
+
+            return res;
+        }
+
+        @Override
+        public final void close() throws Exception {
+            it.close();
+        }
+    }
+
+    private final class ScanLatestVersionsCursor extends BasePartitionTimestampCursor {
+        @Override
+        public boolean hasNext() {
+            // Fast-path for consecutive invocations.
+            if (next != null) {
+                return true;
+            }
+
+            if (currentRowId != null) {
+                setKeyBuffer(seekKeyBuf, currentRowId, null);
+                incrementRowId(seekKeyBuf);
+            }
+
+            currentRowId = null;
+
+            // Prepare direct buffer slice to read keys from the iterator.
+            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+            while (true) {
+                // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
+                // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
+                // in this code. Now let's describe what we'll find, assuming that iterator found something:
+                //  - if timestamp is null:
+                //      - this seek will find the newest version of the next row in iterator. Exactly what we need.
+                //  - if timestamp is not null:
+                //      - suppose that seek key buffer has the following value: "| P0 | R0 | T0 |" (partition, row id, timestamp)
+                //        and iterator finds something, let's denote it as "| P0 | R1 | T1 |" (partition must match). Again, there are
+                //        few possibilities here:
+                //          - R1 == R0, this means a match. By the rules of ordering we derive that T1 >= T0. Timestamps are stored in
+                //            descending order, this means that we found exactly what's needed.
+                //          - R1 > R0, this means that we found next row and T1 is either missing (pending row) or represents the latest
+                //            version of the row. It doesn't matter in this case, because this row id will be reused to find its value
+                //            at time T0. Additional "seek" will be required to do it.
+                it.seek(seekKeyBuf.array());
+
+                // Finish scan if nothing was found.
+                if (invalid(it)) {
+                    return false;
+                }
+
+                // Read the actual key into a direct buffer.
+                int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
+
+                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+                directBuffer.limit(ROW_PREFIX_SIZE);
+
+                // Copy actual row id into a "seekKeyBuf" buffer.
+                seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
+                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
+
+                // This one might look tricky. We finished processing next row. There are three options:
+                //  - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
+                //  - value is empty, we found a tombstone. We'll continue to next iteration as well;
+                //  - value is not empty and everything's good. We'll cache it and return from method.
+                // In all three cases we need to prepare the value of "seekKeyBuf" so that it has not-yet-scanned row id in it.
+                // the only valid way to do so is to treat row id payload as one big unsigned integer in Big Endian and increment it.
+                // It's important to note that increment may overflow. In this case "carry flag" will go into incrementing partition id.
+                // This is fine for three reasons:
+                //  - iterator has an upper bound, following "seek" will result in invalid iterator state.
+                //  - partition id itself cannot be overflown, because it's limited with a constant less than 0xFFFF.
+                // It's something like 65500, I think.
+                //  - "seekKeyBuf" buffer value will not be used after that, so it's ok if we corrupt its data (in every other instance,
+                //    buffer starts with a valid partition id, which is set during buffer's initialization).
+                incrementRowId(seekKeyBuf);
+
+                // Cache row and return "true" if it's found and not a tombstone.
+                byte[] valueBytes = it.value();
+
+                directBuffer.limit(keyLength);
+                ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes);
+
+                if (!readResult.isEmpty()) {
+                    next = readResult;
+                    currentRowId = getRowId(directBuffer);
+
+                    return true;
+                }
+            }
+        }
+    }
+
+    private final class ScanByTimestampCursor extends BasePartitionTimestampCursor {
+        private final HybridTimestamp timestamp;
+
+        public ScanByTimestampCursor(HybridTimestamp timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean hasNext() {
+            // Fast-path for consecutive invocations.
+            if (next != null) {
+                return true;
+            }
+
+            if (currentRowId != null) {
+                setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+                incrementRowId(seekKeyBuf);
+            }
+
+            currentRowId = null;
+
+            // Prepare direct buffer slice to read keys from the iterator.
+            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+            while (true) {
+                it.seek(seekKeyBuf.array());
+
+                if (invalid(it)) {
+                    return false;
+                }
+
+                // We need to figure out what current row id is.
+                it.key(directBuffer.position(0));
+
+                RowId rowId = getRowId(directBuffer);
+
+                setKeyBuffer(seekKeyBuf, rowId, timestamp);
+
+                // Seek to current row id + timestamp.
+                it.seek(seekKeyBuf.array());
+
+                ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
+
+                if (readResult.isEmpty()) {
+                    // Seek to next row id as we found nothing that matches.
+                    incrementRowId(seekKeyBuf);
+
+                    continue;
+                }
+
+                next = readResult;
+                currentRowId = rowId;
+
+                return true;
+            }
+        }
+    }
 }