You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/10/26 16:55:49 UTC

[ignite-3] 01/02: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE) (#1207)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit ca20e3c14982e36779ab9a929ca6d6ddb774ac36
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Oct 17 18:07:50 2022 +0400

    IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE) (#1207)
---
 .../apache/ignite/internal/storage/ReadResult.java |  10 +-
 .../ignite/internal/storage/ReadResultTest.java    |  36 ++
 .../storage/AbstractMvPartitionStorageTest.java    | 150 +++++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 244 +++------
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 585 ++++++++++-----------
 5 files changed, 516 insertions(+), 509 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 735f7dfb11..0bd10c4adb 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -32,7 +32,7 @@ public class ReadResult {
     /** Empty read result. */
     public static final ReadResult EMPTY = new ReadResult(null, null, null, null, null, UNDEFINED_COMMIT_PARTITION_ID);
 
-    /** Data. */
+    /** Data. {@code null} iff the result is empty (i.e. no row exists or it is a tombstone). */
     private final @Nullable BinaryRow binaryRow;
 
     /** Transaction id. Not {@code null} iff this is a write-intent. */
@@ -90,11 +90,11 @@ public class ReadResult {
     }
 
     /**
-     * Returns binary row representation of the data.
+     * Returns binary row representation of the data, {@code null} if {@link #isEmpty()}.
      *
-     * @return Binary row representation of the data.
+     * @return Binary row representation of the data, {@code null} if {@link #isEmpty()}.
      */
-    public BinaryRow binaryRow() {
+    public @Nullable BinaryRow binaryRow() {
         return binaryRow;
     }
 
@@ -155,6 +155,6 @@ public class ReadResult {
     }
 
     public boolean isEmpty() {
-        return this == EMPTY;
+        return binaryRow == null;
     }
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
new file mode 100644
index 0000000000..9310392568
--- /dev/null
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ReadResultTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class ReadResultTest {
+    @Test
+    void resultInEmptyConstantIsEmpty() {
+        assertTrue(ReadResult.EMPTY.isEmpty());
+    }
+
+    @Test
+    void resultWithNullRowIsEmpty() {
+        ReadResult result = ReadResult.createFromCommitted(null, null);
+
+        assertTrue(result.isEmpty());
+    }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 98e25e1f2f..76c6db0b00 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -48,6 +48,8 @@ import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * Base test for MV partition storages.
@@ -155,9 +157,10 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         assertNull(read(rowId, clock.now()));
     }
 
-    @Test
-    public void testScanOverEmpty() throws Exception {
-        assertEquals(List.of(), convert(scan(clock.now())));
+    @ParameterizedTest
+    @EnumSource
+    public void testScanOverEmpty(ScanTimestampProvider tsProvider) throws Exception {
+        assertEquals(List.of(), convert(scan(tsProvider.scanTimestamp(clock))));
     }
 
     /**
@@ -302,8 +305,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         assertEquals(List.of(value1, value2), convert(scan(ts4)));
         assertEquals(List.of(value1, value2), convert(scan(ts5)));
+
+        assertEquals(List.of(value1, value2), convert(scan(HybridTimestamp.MAX_VALUE)));
     }
 
+    @SuppressWarnings("ConstantConditions")
     @Test
     public void testTransactionScanCursorInvariants() throws Exception {
         TestValue value1 = new TestValue(10, "xxx");
@@ -338,6 +344,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    @SuppressWarnings("ConstantConditions")
     @Test
     public void testTimestampScanCursorInvariants() throws Exception {
         TestValue value11 = new TestValue(10, "xxx");
@@ -409,6 +416,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
             assertFalse(cursor.hasNext());
             assertFalse(cursor.hasNext());
 
+            assertThrows(NoSuchElementException.class, () -> cursor.next());
+
             assertThrows(IllegalStateException.class, () -> cursor.committed(commitTs1));
         }
     }
@@ -827,11 +836,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         });
     }
 
-    @Test
-    void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    void scanWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite(ScanTimestampProvider tsProvider) throws Exception {
         commitAbortAndAddUncommitted();
 
-        try (Cursor<ReadResult> cursor = storage.scan(clock.now())) {
+        try (Cursor<ReadResult> cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
             BinaryRow foundRow = cursor.next().binaryRow();
 
             assertRowMatches(foundRow, binaryRow3);
@@ -1086,23 +1096,12 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
-    @Test
-    void testScanWithWriteIntent() throws Exception {
-        RowId rowId1 = new RowId(PARTITION_ID);
-
-        HybridTimestamp commit1ts = clock.now();
-
-        storage.runConsistently(() -> {
-            addWrite(rowId1, binaryRow, newTransactionId());
-
-            commitWrite(rowId1, commit1ts);
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    void testScanWithWriteIntent(ScanTimestampProvider tsProvider) throws Exception {
+        HybridTimestamp commitTs = addCommittedVersionAndWriteIntent();
 
-            addWrite(rowId1, binaryRow2, newTransactionId());
-
-            return null;
-        });
-
-        try (PartitionTimestampCursor cursor = storage.scan(clock.now())) {
+        try (PartitionTimestampCursor cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
             assertTrue(cursor.hasNext());
 
             ReadResult next = cursor.next();
@@ -1111,12 +1110,30 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
             assertRowMatches(next.binaryRow(), binaryRow2);
 
-            BinaryRow committedRow = cursor.committed(next.newestCommitTimestamp());
+            BinaryRow committedRow = cursor.committed(commitTs);
 
             assertRowMatches(committedRow, binaryRow);
         }
     }
 
+    private HybridTimestamp addCommittedVersionAndWriteIntent() {
+        RowId rowId = new RowId(PARTITION_ID);
+
+        HybridTimestamp commitTs = clock.now();
+
+        storage.runConsistently(() -> {
+            addWrite(rowId, binaryRow, newTransactionId());
+
+            commitWrite(rowId, commitTs);
+
+            addWrite(rowId, binaryRow2, newTransactionId());
+
+            return null;
+        });
+
+        return commitTs;
+    }
+
     @Test
     void testScanVersionsWithWriteIntent() throws Exception {
         RowId rowId = new RowId(PARTITION_ID, 100, 0);
@@ -1141,8 +1158,8 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         assertEquals(2, list.size());
 
-        for (int i = 0; i < list.size(); i++) {
-            assertEquals(key, list.get(i).getKey());
+        for (IgniteBiTuple<TestKey, TestValue> objects : list) {
+            assertEquals(key, objects.getKey());
         }
 
         assertEquals(value2, list.get(0).getValue());
@@ -1235,6 +1252,72 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    public void scanCursorHasNextReturnsFalseEachTimeAfterExhaustion(ScanTimestampProvider tsProvider) throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        commitWrite(rowId, clock.now());
+
+        try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
+            cursor.next();
+
+            assertFalse(cursor.hasNext());
+            //noinspection ConstantConditions
+            assertFalse(cursor.hasNext());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception {
+        testScanDoesNotSeeTombstones(tsProvider, false);
+    }
+
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    public void scanDoesNotSeeTombstonesWhenTombstoneIsCommitted(ScanTimestampProvider tsProvider) throws Exception {
+        testScanDoesNotSeeTombstones(tsProvider, true);
+    }
+
+    private void testScanDoesNotSeeTombstones(ScanTimestampProvider scantsProvider, boolean commitRemoval) throws Exception {
+        RowId rowId = insert(binaryRow, txId);
+        commitWrite(rowId, clock.now());
+
+        addWrite(rowId, null, newTransactionId());
+        if (commitRemoval) {
+            commitWrite(rowId, clock.now());
+        }
+
+        assertScanSeesNothing(scantsProvider);
+    }
+
+    private void assertScanSeesNothing(ScanTimestampProvider scanTsProvider) throws Exception {
+        try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+            assertFalse(cursor.hasNext());
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(ScanTimestampProvider.class)
+    void committedMethodCallDoesNotInterfereWithIteratingOverScanCursor(ScanTimestampProvider scanTsProvider) throws Exception {
+        RowId rowId1 = insert(binaryRow, txId);
+        HybridTimestamp commitTs1 = clock.now();
+        commitWrite(rowId1, commitTs1);
+
+        insert(binaryRow2, txId);
+
+        try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
+            cursor.next();
+
+            cursor.committed(commitTs1);
+
+            ReadResult result2 = cursor.next();
+            assertRowMatches(result2.binaryRow(), binaryRow2);
+
+            assertFalse(cursor.hasNext());
+        }
+    }
+
     /**
      * Returns row id that is lexicographically smaller (by the value of one) than the argument.
      *
@@ -1253,4 +1336,21 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         return new RowId(value.partitionId(), msb, lsb);
     }
+
+    private enum ScanTimestampProvider {
+        NOW {
+            @Override
+            HybridTimestamp scanTimestamp(HybridClock clock) {
+                return clock.now();
+            }
+        },
+        MAX_VALUE {
+            @Override
+            HybridTimestamp scanTimestamp(HybridClock clock) {
+                return HybridTimestamp.MAX_VALUE;
+            }
+        };
+
+        abstract HybridTimestamp scanTimestamp(HybridClock clock);
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 60dccdc65e..3297c8cbc6 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -73,8 +73,6 @@ import org.jetbrains.annotations.Nullable;
 public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitionStorage {
     private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
 
-    private static final Predicate<BinaryRow> MATCH_ALL = row -> true;
-
     private static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp -> true;
 
     protected final int partitionId;
@@ -260,32 +258,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    /**
-     * Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
-     *
-     * @param rowId Row id.
-     * @param txId Transaction id.
-     * @return Read result that corresponds to the key or {@code null} if value is not found.
-     * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
-     * @throws StorageException If failed to read data from the storage.
-     */
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
-        if (rowId.partitionId() != partitionId) {
-            throw new IllegalArgumentException(
-                    String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
-        }
-
-        VersionChain versionChain = findVersionChain(rowId);
-
-        if (versionChain == null) {
-            return null;
-        }
-
-        return findLatestRowVersion(versionChain, txId, MATCH_ALL);
-    }
-
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
         if (rowId.partitionId() != partitionId) {
@@ -299,7 +271,15 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             return ReadResult.EMPTY;
         }
 
-        return findRowVersionByTimestamp(versionChain, timestamp);
+        if (lookingForLatestVersion(timestamp)) {
+            return findLatestRowVersion(versionChain);
+        } else {
+            return findRowVersionByTimestamp(versionChain, timestamp);
+        }
+    }
+
+    private boolean lookingForLatestVersion(HybridTimestamp timestamp) {
+        return timestamp == HybridTimestamp.MAX_VALUE;
     }
 
     private @Nullable VersionChain findVersionChain(RowId rowId) {
@@ -310,26 +290,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    private @Nullable BinaryRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+    private ReadResult findLatestRowVersion(VersionChain versionChain) {
         RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
 
-        ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
-
-        if (keyFilter != null && !keyFilter.test(row)) {
-            return null;
-        }
-
         if (versionChain.isUncommitted()) {
-            UUID chainTxId = versionChain.transactionId();
+            assert versionChain.transactionId() != null;
 
-            assert chainTxId != null;
-
-            throwIfChainBelongsToAnotherTx(versionChain, txId);
+            return writeIntentToResult(versionChain, rowVersion, null);
+        } else {
+            ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
-            return row;
+            return ReadResult.createFromCommitted(row, rowVersion.timestamp());
         }
-
-        return row;
     }
 
     private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -682,24 +654,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    /**
-     * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
-     * or already committed in a different transaction.
-     *
-     * @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
-     * @param txId Transaction id.
-     * @return Cursor.
-     * @throws StorageException If failed to read data from the storage.
-     */
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
-        return internalScan(keyFilter, txId);
-    }
-
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
-        assert timestamp != null;
+        Objects.requireNonNull(timestamp, "timestamp is null");
 
         Cursor<VersionChain> treeCursor;
 
@@ -709,21 +666,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             throw new StorageException("Find failed", e);
         }
 
-        return new TimestampCursor(treeCursor, timestamp);
-    }
-
-    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) {
-        assert txId != null;
-
-        Cursor<VersionChain> treeCursor;
-
-        try {
-            treeCursor = versionChainTree.find(null, null);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Find failed", e);
+        if (lookingForLatestVersion(timestamp)) {
+            return new LatestVersionsCursor(treeCursor);
+        } else {
+            return new TimestampCursor(treeCursor, timestamp);
         }
-
-        return new TransactionIdCursor(treeCursor, keyFilter, txId);
     }
 
     @Override
@@ -765,25 +712,67 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         // TODO: IGNITE-17132 Implement it
     }
 
+    private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
+        protected final Cursor<VersionChain> treeCursor;
+
+        @Nullable
+        protected ReadResult nextRead = null;
+
+        @Nullable
+        protected VersionChain currentChain = null;
+
+        protected BasePartitionTimestampCursor(Cursor<VersionChain> treeCursor) {
+            this.treeCursor = treeCursor;
+        }
+
+        @Override
+        public final ReadResult next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("The cursor is exhausted");
+            }
+
+            assert nextRead != null;
+
+            ReadResult res = nextRead;
+
+            nextRead = null;
+
+            return res;
+        }
+
+        @Override
+        public void close() throws Exception {
+            treeCursor.close();
+        }
+
+        @Override
+        public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+            if (currentChain == null) {
+                throw new IllegalStateException();
+            }
+
+            ReadResult result = findRowVersionByTimestamp(currentChain, timestamp);
+            if (result.isEmpty()) {
+                return null;
+            }
+
+            // We don't check if row conforms the key filter here, because we've already checked it.
+            return result.binaryRow();
+        }
+    }
+
     /**
      * Implementation of the {@link PartitionTimestampCursor} over the page memory storage.
      * See {@link PartitionTimestampCursor} for the details on the API.
      */
-    private class TimestampCursor implements PartitionTimestampCursor {
-        private final Cursor<VersionChain> treeCursor;
-
+    private class TimestampCursor extends BasePartitionTimestampCursor {
         private final HybridTimestamp timestamp;
 
-        @Nullable
-        private ReadResult nextRead = null;
-
-        @Nullable
-        private VersionChain currentChain = null;
-
         private boolean iterationExhausted = false;
 
         public TimestampCursor(Cursor<VersionChain> treeCursor, HybridTimestamp timestamp) {
-            this.treeCursor = treeCursor;
+            super(treeCursor);
+
             this.timestamp = timestamp;
         }
 
@@ -807,50 +796,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 }
 
                 VersionChain chain = treeCursor.next();
-                ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+                ReadResult result = findRowVersionByTimestamp(chain, timestamp);
 
-                if (res.isEmpty()) {
+                if (result.isEmpty()) {
                     continue;
                 }
 
-                nextRead = res;
+                nextRead = result;
                 currentChain = chain;
 
                 return true;
             }
         }
-
-        @Override
-        public ReadResult next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException("The cursor is exhausted");
-            }
-
-            assert nextRead != null;
-
-            ReadResult res = nextRead;
-
-            nextRead = null;
-
-            return res;
-        }
-
-        @Override
-        public void close() {
-            // No-op.
-        }
-
-        @Override
-        public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
-            if (currentChain == null) {
-                throw new IllegalStateException();
-            }
-
-            ReadResult res = findRowVersionByTimestamp(currentChain, timestamp);
-
-            // We don't check if row conforms the key filter here, because we've already checked it.
-            return res.binaryRow();
-        }
     }
 
     /**
@@ -858,30 +815,16 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
      * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
      * or already committed in a different transaction.
      */
-    private class TransactionIdCursor implements Cursor<BinaryRow> {
-        private final Cursor<VersionChain> treeCursor;
-
-        private final Predicate<BinaryRow> keyFilter;
-
-        private final @Nullable UUID transactionId;
-
-        private BinaryRow nextRow = null;
-
+    private class LatestVersionsCursor extends BasePartitionTimestampCursor {
         private boolean iterationExhausted = false;
 
-        public TransactionIdCursor(
-                Cursor<VersionChain> treeCursor,
-                Predicate<BinaryRow> keyFilter,
-                @Nullable UUID transactionId
-        ) {
-            this.treeCursor = treeCursor;
-            this.keyFilter = keyFilter;
-            this.transactionId = transactionId;
+        public LatestVersionsCursor(Cursor<VersionChain> treeCursor) {
+            super(treeCursor);
         }
 
         @Override
         public boolean hasNext() {
-            if (nextRow != null) {
+            if (nextRead != null) {
                 return true;
             }
 
@@ -896,32 +839,17 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 }
 
                 VersionChain chain = treeCursor.next();
-                BinaryRow row = findLatestRowVersion(chain, transactionId, keyFilter);
+                ReadResult result = findLatestRowVersion(chain);
 
-                if (row != null) {
-                    nextRow = row;
-                    return true;
+                if (result.isEmpty()) {
+                    continue;
                 }
-            }
-        }
 
-        @Override
-        public BinaryRow next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException("The cursor is exhausted");
-            }
-
-            assert nextRow != null;
-
-            BinaryRow row = nextRow;
-            nextRow = null;
-
-            return row;
-        }
+                nextRead = result;
+                currentChain = chain;
 
-        @Override
-        public void close() {
-            // No-op.
+                return true;
+            }
         }
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 85375c72ac..1a598edd3b 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -31,10 +31,10 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
-import java.util.function.Predicate;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -468,31 +468,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         }
     }
 
-    /**
-     * Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
-     *
-     * @param rowId Row id.
-     * @param txId Transaction id.
-     * @return Read result that corresponds to the key or {@code null} if value is not found.
-     * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
-     * @throws StorageException If failed to read data from the storage.
-     */
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
-        return read(rowId, null, txId).binaryRow();
-    }
-
     /** {@inheritDoc} */
     @Override
     public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
-        return read(rowId, timestamp, null);
-    }
-
-    private ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
-            throws TxIdMismatchException, StorageException {
-        assert timestamp == null ^ txId == null;
-
         if (rowId.partitionId() != partitionId) {
             throw new IllegalArgumentException(
                     String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
@@ -503,55 +481,65 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
         try (
                 // Set next partition as an upper bound.
-                var readOpts = new ReadOptions().setIterateUpperBound(upperBound);
-                RocksIterator baseIterator = db.newIterator(cf, readOpts);
+                var readOpts1 = new ReadOptions().setIterateUpperBound(upperBound);
+                RocksIterator baseIterator = db.newIterator(cf, readOpts1);
                 // "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
                 // It's not documented, but this is exactly how it should be used.
                 RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
                         ? writeBatch.newIteratorWithBase(cf, baseIterator)
                         : baseIterator
         ) {
-            if (timestamp == null) {
-                ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+            if (lookingForLatestVersions(timestamp)) {
+                return readLatestVersion(rowId, seekIterator);
+            } else {
+                return readByTimestamp(seekIterator, rowId, timestamp);
+            }
+        }
+    }
 
-                // Seek to the first appearance of row id if timestamp isn't set.
-                // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
-                // Unfortunately, copy here is unavoidable with current API.
-                assert keyBuf.position() == ROW_PREFIX_SIZE;
-                seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+    private boolean lookingForLatestVersions(HybridTimestamp timestamp) {
+        return timestamp == HybridTimestamp.MAX_VALUE;
+    }
 
-                if (invalid(seekIterator)) {
-                    // No data at all.
-                    return ReadResult.EMPTY;
-                }
+    private ReadResult readLatestVersion(RowId rowId, RocksIterator seekIterator) {
+        ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+
+        // Seek to the first appearance of row id if timestamp isn't set.
+        // Since timestamps are sorted from newest to oldest, first occurrence will always be the latest version.
+        // Unfortunately, copy here is unavoidable with current API.
+        assert keyBuf.position() == ROW_PREFIX_SIZE;
+        seekIterator.seek(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
 
-                ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
+        if (invalid(seekIterator)) {
+            // No data at all.
+            return ReadResult.EMPTY;
+        }
 
-                int keyLength = seekIterator.key(readKeyBuf);
+        ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(MAX_KEY_SIZE);
 
-                if (!matches(rowId, readKeyBuf)) {
-                    // Wrong row id.
-                    return ReadResult.EMPTY;
-                }
+        int keyLength = seekIterator.key(readKeyBuf);
 
-                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+        if (!matches(rowId, readKeyBuf)) {
+            // It is already a different row, so no version exists for our rowId.
+            return ReadResult.EMPTY;
+        }
 
-                byte[] valueBytes = seekIterator.value();
+        boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                if (!isWriteIntent) {
-                    // There is no write-intent, return latest committed row.
-                    return wrapCommittedValue(valueBytes, readTimestamp(readKeyBuf));
-                }
+        byte[] valueBytes = seekIterator.value();
 
-                assert valueBytes != null;
+        return readResultFromKeyAndValue(isWriteIntent, readKeyBuf, valueBytes);
+    }
 
-                validateTxId(valueBytes, txId);
+    private static ReadResult readResultFromKeyAndValue(boolean isWriteIntent, ByteBuffer keyBuf, byte[] valueBytes) {
+        assert valueBytes != null;
 
-                return wrapUncommittedValue(valueBytes, null);
-            } else {
-                return readByTimestamp(seekIterator, rowId, timestamp);
-            }
+        if (!isWriteIntent) {
+            // There is no write-intent, return latest committed row.
+            return wrapCommittedValue(valueBytes, readTimestamp(keyBuf));
         }
+
+        return wrapUncommittedValue(valueBytes, null);
     }
 
     /**
@@ -562,7 +550,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * @param timestamp Timestamp.
      * @return Read result.
      */
-    private @Nullable ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
+    private ReadResult readByTimestamp(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp) {
         ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
 
         // Put timestamp restriction according to N2O timestamps order.
@@ -685,14 +673,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * Checks if row id matches the one written in the key buffer. Note: this operation changes the position in the buffer.
      *
      * @param rowId Row id.
-     * @param keyByf Key buffer.
+     * @param keyBuf Key buffer.
      * @return {@code true} if row id matches the key buffer, {@code false} otherwise.
      */
-    private static boolean matches(RowId rowId, ByteBuffer keyByf) {
+    private static boolean matches(RowId rowId, ByteBuffer keyBuf) {
         // Comparison starts from the position of the row id.
-        keyByf.position(ROW_ID_OFFSET);
+        keyBuf.position(ROW_ID_OFFSET);
 
-        return rowId.mostSignificantBits() == normalize(keyByf.getLong()) && rowId.leastSignificantBits() == normalize(keyByf.getLong());
+        return rowId.mostSignificantBits() == normalize(keyBuf.getLong()) && rowId.leastSignificantBits() == normalize(keyBuf.getLong());
     }
 
     @Override
@@ -718,13 +706,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
                 boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
 
-                if (!isWriteIntent) {
-                    return wrapCommittedValue(value, readTimestamp(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER)));
-                }
-
-                assert value != null;
-
-                return wrapUncommittedValue(value, null);
+                return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
             }
 
             @Override
@@ -736,264 +718,28 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         };
     }
 
-    /**
-     * Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
-     * or already committed in a different transaction.
-     *
-     * @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
-     * @param txId Transaction id.
-     * @return Cursor.
-     * @throws StorageException If failed to read data from the storage.
-     */
     // TODO: IGNITE-16914 Play with prefix settings and benchmark results.
-    // TODO: IGNITE-17864 Optimize scan(HybridTimestamp.MAX_VALUE) and read(HybridTimestamp.MAX_VALUE)
-    @Deprecated
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
-        assert txId != null;
-
-        RocksIterator it = db.newIterator(cf, scanReadOptions);
-
-        // Seek iterator to the beginning of the partition.
-        it.seek(partitionStartPrefix());
-
-        // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
-        // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
-        // partition.
-        // Byte buffer from a thread-local field can't be used here, because of two reasons:
-        //  - no one guarantees that there will only be a single cursor;
-        //  - no one guarantees that returned cursor will not be used by other threads.
-        // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
-        ByteBuffer seekKeyBuf = ByteBuffer.allocate(ROW_PREFIX_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
-
-        return new Cursor<>() {
-            /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
-            private BinaryRow next;
-
-            /** {@inheritDoc} */
-            @Override
-            public boolean hasNext() {
-                // Fast-path for consecutive invocations.
-                if (next != null) {
-                    return true;
-                }
-
-                // Prepare direct buffer slice to read keys from the iterator.
-                ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
-
-                while (true) {
-                    // We should do after each seek. Here in particular it means one of two things:
-                    //  - partition is empty;
-                    //  - iterator exhausted all the data in partition.
-                    if (invalid(it)) {
-                        return false;
-                    }
-
-                    // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
-                    // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
-                    // in this code. Now let's describe what we'll find, assuming that iterator found something:
-                    //  - if timestamp is null:
-                    //      - this seek will find the newest version of the next row in iterator. Exactly what we need.
-                    //  - if timestamp is not null:
-                    //      - suppose that seek key buffer has the following value: "| P0 | R0 | T0 |" (partition, row id, timestamp)
-                    //        and iterator finds something, let's denote it as "| P0 | R1 | T1 |" (partition must match). Again, there are
-                    //        few possibilities here:
-                    //          - R1 == R0, this means a match. By the rules of ordering we derive that T1 >= T0. Timestamps are stored in
-                    //            descending order, this means that we found exactly what's needed.
-                    //          - R1 > R0, this means that we found next row and T1 is either missing (pending row) or represents the latest
-                    //            version of the row. It doesn't matter in this case, because this row id will be reused to find its value
-                    //            at time T0. Additional "seek" will be required to do it.
-                    it.seek(seekKeyBuf.array());
-
-                    // Finish scan if nothing was found.
-                    if (invalid(it)) {
-                        return false;
-                    }
-
-                    // Read the actual key into a direct buffer.
-                    int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
-
-                    boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
-
-                    directBuffer.limit(ROW_PREFIX_SIZE);
-
-                    // Copy actual row id into a "seekKeyBuf" buffer.
-                    seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
-                    seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
-
-                    // This one might look tricky. We finished processing next row. There are three options:
-                    //  - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
-                    //  - value is empty, we found a tombstone. We'll continue to next iteration as well;
-                    //  - value is not empty and everything's good. We'll cache it and return from method.
-                    // In all three cases we need to prepare the value of "seekKeyBuf" so that it has not-yet-scanned row id in it.
-                    // the only valid way to do so is to treat row id payload as one big unsigned integer in Big Endian and increment it.
-                    // It's important to note that increment may overflow. In this case "carry flag" will go into incrementing partition id.
-                    // This is fine for three reasons:
-                    //  - iterator has an upper bound, following "seek" will result in invalid iterator state.
-                    //  - partition id itself cannot be overflown, because it's limited with a constant less than 0xFFFF.
-                    // It's something like 65500, I think.
-                    //  - "seekKeyBuf" buffer value will not be used after that, so it's ok if we corrupt its data (in every other instance,
-                    //    buffer starts with a valid partition id, which is set during buffer's initialization).
-                    incrementRowId(seekKeyBuf);
-
-                    // Cache row and return "true" if it's found and not a tombstone.
-                    byte[] valueBytes = it.value();
-
-                    BinaryRow binaryRow = wrapValueIntoBinaryRow(valueBytes, isWriteIntent);
-
-                    if (binaryRow != null && (keyFilter == null || keyFilter.test(binaryRow))) {
-                        if (isWriteIntent) {
-                            validateTxId(valueBytes, txId);
-                        }
-
-                        next = binaryRow;
-
-                        return true;
-                    }
-                }
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public BinaryRow next() {
-                if (!hasNext()) {
-                    throw new NoSuchElementException();
-                }
-
-                BinaryRow res = next;
-
-                next = null;
-
-                return res;
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public void close() throws Exception {
-                IgniteUtils.closeAll(it);
-            }
-        };
-    }
-
     /** {@inheritDoc} */
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
-        assert timestamp != null;
-
-        RocksIterator it = db.newIterator(cf, scanReadOptions);
-
-        // You can see the motivation behind the usage of a separate (not thread-local) buffer in the transaction id
-        // cursor code.
-        ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
-
-        return new PartitionTimestampCursor() {
-
-            private RowId currentRowId;
-
-            /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
-            private ReadResult next;
-
-            private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, HybridTimestamp timestamp) {
-                keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
-                keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
-
-                putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
-
-                keyBuf.position(0);
-            }
-
-            @Override
-            public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
-                if (currentRowId == null) {
-                    throw new IllegalStateException();
-                }
-
-                setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
-
-                it.seek(seekKeyBuf.array());
-
-                ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
-
-                if (readResult.isEmpty()) {
-                    return null;
-                }
-
-                // We don't check if row conforms the key filter here, because we've already checked it.
-                return readResult.binaryRow();
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public boolean hasNext() {
-                // Fast-path for consecutive invocations.
-                if (next != null) {
-                    return true;
-                }
-
-                if (currentRowId != null) {
-                    setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
-                    incrementRowId(seekKeyBuf);
-                }
-
-                currentRowId = null;
-
-                // Prepare direct buffer slice to read keys from the iterator.
-                ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
-
-                while (true) {
-                    it.seek(seekKeyBuf.array());
-
-                    if (invalid(it)) {
-                        return false;
-                    }
+        Objects.requireNonNull(timestamp, "timestamp is null");
 
-                    // We need to figure out what current row id is.
-                    it.key(directBuffer.position(0));
-
-                    directBuffer.position(ROW_ID_OFFSET);
-
-                    RowId rowId = getRowId(directBuffer);
-
-                    setKeyBuffer(seekKeyBuf, rowId, timestamp);
-
-                    // Seek to current row id + timestamp.
-                    it.seek(seekKeyBuf.array());
-
-                    ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
-
-                    if (readResult.isEmpty()) {
-                        // Seek to next row id as we found nothing that matches.
-                        incrementRowId(seekKeyBuf);
-
-                        continue;
-                    }
-
-                    next = readResult;
-                    currentRowId = rowId;
-
-                    return true;
-                }
-            }
-
-            /** {@inheritDoc} */
-            @Override
-            public ReadResult next() {
-                if (!hasNext()) {
-                    throw new NoSuchElementException();
-                }
-
-                ReadResult res = next;
+        if (lookingForLatestVersions(timestamp)) {
+            return new ScanLatestVersionsCursor();
+        } else {
+            return new ScanByTimestampCursor(timestamp);
+        }
+    }
 
-                next = null;
+    private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable HybridTimestamp timestamp) {
+        keyBuf.putLong(ROW_ID_OFFSET, normalize(rowId.mostSignificantBits()));
+        keyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, normalize(rowId.leastSignificantBits()));
 
-                return res;
-            }
+        if (timestamp != null) {
+            putTimestamp(keyBuf.position(ROW_PREFIX_SIZE), timestamp);
+        }
 
-            /** {@inheritDoc} */
-            @Override
-            public void close() throws Exception {
-                IgniteUtils.closeAll(it);
-            }
-        };
+        keyBuf.position(0);
     }
 
     @Override
@@ -1013,8 +759,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
             it.key(readKeyBuf);
 
-            readKeyBuf.position(ROW_ID_OFFSET);
-
             return getRowId(readKeyBuf);
         } finally {
             keyBuf.limit(MAX_KEY_SIZE);
@@ -1047,8 +791,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         buf.position(0);
     }
 
-    private RowId getRowId(ByteBuffer readKeyBuf) {
-        return new RowId(partitionId, normalize(readKeyBuf.getLong()), normalize(readKeyBuf.getLong()));
+    private RowId getRowId(ByteBuffer buffer) {
+        buffer.position(ROW_ID_OFFSET);
+
+        return new RowId(partitionId, normalize(buffer.getLong()), normalize(buffer.getLong()));
     }
 
     @Override
@@ -1088,7 +834,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
                 boolean valueHasTxId = keyBytes.length == ROW_PREFIX_SIZE;
 
                 if (!isTombstone(valueBytes, valueHasTxId)) {
-                    ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER).position(ROW_ID_OFFSET);
+                    ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes).order(KEY_BYTE_ORDER);
 
                     RowId rowId = getRowId(keyBuf);
 
@@ -1295,4 +1041,201 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     private static boolean isTombstone(byte[] valueBytes, boolean hasTxId) {
         return valueBytes.length == (hasTxId ? VALUE_HEADER_SIZE : 0);
     }
+
+    private abstract class BasePartitionTimestampCursor implements PartitionTimestampCursor {
+        protected final RocksIterator it = db.newIterator(cf, scanReadOptions);
+
+        // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
+        // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
+        // partition.
+        // Byte buffer from a thread-local field can't be used here, because of two reasons:
+        //  - no one guarantees that there will only be a single cursor;
+        //  - no one guarantees that returned cursor will not be used by other threads.
+        // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
+        protected final ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER).putShort((short) partitionId);
+
+        protected RowId currentRowId;
+
+        /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
+        protected ReadResult next;
+
+        @Override
+        public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+            Objects.requireNonNull(timestamp, "timestamp is null");
+
+            if (currentRowId == null) {
+                throw new IllegalStateException("currentRowId is null");
+            }
+
+            setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+
+            it.seek(seekKeyBuf.array());
+
+            ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
+
+            if (readResult.isEmpty()) {
+                return null;
+            }
+
+            return readResult.binaryRow();
+        }
+
+        @Override
+        public final ReadResult next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            ReadResult res = next;
+
+            next = null;
+
+            return res;
+        }
+
+        @Override
+        public final void close() throws Exception {
+            it.close();
+        }
+    }
+
+    private final class ScanLatestVersionsCursor extends BasePartitionTimestampCursor {
+        @Override
+        public boolean hasNext() {
+            // Fast-path for consecutive invocations.
+            if (next != null) {
+                return true;
+            }
+
+            if (currentRowId != null) {
+                setKeyBuffer(seekKeyBuf, currentRowId, null);
+                incrementRowId(seekKeyBuf);
+            }
+
+            currentRowId = null;
+
+            // Prepare direct buffer slice to read keys from the iterator.
+            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+            while (true) {
+                // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any
+                // other row id in partition. When we start, row id is filled with zeroes. Value during the iteration is described later
+                // in this code. Now let's describe what we'll find, assuming that iterator found something:
+                //  - if timestamp is null:
+                //      - this seek will find the newest version of the next row in iterator. Exactly what we need.
+                //  - if timestamp is not null:
+                //      - suppose that seek key buffer has the following value: "| P0 | R0 | T0 |" (partition, row id, timestamp)
+                //        and iterator finds something, let's denote it as "| P0 | R1 | T1 |" (partition must match). Again, there are
+                //        few possibilities here:
+                //          - R1 == R0, this means a match. By the rules of ordering we derive that T1 >= T0. Timestamps are stored in
+                //            descending order, this means that we found exactly what's needed.
+                //          - R1 > R0, this means that we found next row and T1 is either missing (pending row) or represents the latest
+                //            version of the row. It doesn't matter in this case, because this row id will be reused to find its value
+                //            at time T0. Additional "seek" will be required to do it.
+                it.seek(seekKeyBuf.array());
+
+                // Finish scan if nothing was found.
+                if (invalid(it)) {
+                    return false;
+                }
+
+                // Read the actual key into a direct buffer.
+                int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE));
+
+                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+                directBuffer.limit(ROW_PREFIX_SIZE);
+
+                // Copy actual row id into a "seekKeyBuf" buffer.
+                seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET));
+                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES));
+
+                // This one might look tricky. We finished processing next row. There are three options:
+                //  - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration;
+                //  - value is empty, we found a tombstone. We'll continue to next iteration as well;
+                //  - value is not empty and everything's good. We'll cache it and return from method.
+                // In all three cases we need to prepare the value of "seekKeyBuf" so that it has not-yet-scanned row id in it.
+                // the only valid way to do so is to treat row id payload as one big unsigned integer in Big Endian and increment it.
+                // It's important to note that increment may overflow. In this case "carry flag" will go into incrementing partition id.
+                // This is fine for three reasons:
+                //  - iterator has an upper bound, following "seek" will result in invalid iterator state.
+                //  - partition id itself cannot be overflown, because it's limited with a constant less than 0xFFFF.
+                // It's something like 65500, I think.
+                //  - "seekKeyBuf" buffer value will not be used after that, so it's ok if we corrupt its data (in every other instance,
+                //    buffer starts with a valid partition id, which is set during buffer's initialization).
+                incrementRowId(seekKeyBuf);
+
+                // Cache row and return "true" if it's found and not a tombstone.
+                byte[] valueBytes = it.value();
+
+                directBuffer.limit(keyLength);
+                ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes);
+
+                if (!readResult.isEmpty()) {
+                    next = readResult;
+                    currentRowId = getRowId(directBuffer);
+
+                    return true;
+                }
+            }
+        }
+    }
+
+    private final class ScanByTimestampCursor extends BasePartitionTimestampCursor {
+        private final HybridTimestamp timestamp;
+
+        public ScanByTimestampCursor(HybridTimestamp timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean hasNext() {
+            // Fast-path for consecutive invocations.
+            if (next != null) {
+                return true;
+            }
+
+            if (currentRowId != null) {
+                setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+                incrementRowId(seekKeyBuf);
+            }
+
+            currentRowId = null;
+
+            // Prepare direct buffer slice to read keys from the iterator.
+            ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+            while (true) {
+                it.seek(seekKeyBuf.array());
+
+                if (invalid(it)) {
+                    return false;
+                }
+
+                // We need to figure out what current row id is.
+                it.key(directBuffer.position(0));
+
+                RowId rowId = getRowId(directBuffer);
+
+                setKeyBuffer(seekKeyBuf, rowId, timestamp);
+
+                // Seek to current row id + timestamp.
+                it.seek(seekKeyBuf.array());
+
+                ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf);
+
+                if (readResult.isEmpty()) {
+                    // Seek to next row id as we found nothing that matches.
+                    incrementRowId(seekKeyBuf);
+
+                    continue;
+                }
+
+                next = readResult;
+                currentRowId = rowId;
+
+                return true;
+            }
+        }
+    }
 }