You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/09/27 13:30:14 UTC

[ignite-3] 01/02: IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17720
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 28a299ddd14e6469fe815dbbee71fd84ad59316a
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Sep 27 16:19:07 2022 +0400

    IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities
---
 .../internal/storage/MvPartitionStorage.java       |   4 +-
 .../internal/storage/PartitionScanCursor.java      |  35 ++++
 .../apache/ignite/internal/storage/ReadResult.java |   8 +
 .../storage/AbstractMvPartitionStorageTest.java    |  60 +++++--
 .../TestConcurrentHashMapMvPartitionStorage.java   |  67 +++++++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 179 +++++++++++++++------
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 153 +++++++++++++++++-
 .../distributed/storage/VersionedRowStore.java     |   5 +-
 8 files changed, 438 insertions(+), 73 deletions(-)

diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index de9742bf86..4d3dc080b3 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -228,7 +228,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @deprecated Use {@link #scan(Predicate, HybridTimestamp)}
      */
     @Deprecated
-    default Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
+    default PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException {
         return scan(keyFilter, convertTimestamp(timestamp));
     }
 
@@ -241,7 +241,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
      * @throws StorageException If failed to read data from the storage.
      */
-    Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException;
+    PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException;
 
     /**
      * Returns rows count belongs to current storage.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java
new file mode 100644
index 0000000000..8e76c3131b
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Partition cursor.
+ */
+public interface PartitionScanCursor extends Cursor<ReadResult> {
+    /**
+     * Returns a committed row within the current row id.
+     *
+     * @param timestamp Commit timestamp.
+     * @return Row or {@code null} if it doesn't exist.
+     */
+    BinaryRow committed(HybridTimestamp timestamp);
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
index 4ebd74fe3a..a1d912c6b2 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java
@@ -125,4 +125,12 @@ public class ReadResult {
     public int commitPartitionId() {
         return commitPartitionId;
     }
+
+    public boolean isWriteIntent() {
+        return transactionId != null;
+    }
+
+    public boolean isEmpty() {
+        return this == EMPTY;
+    }
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 21e01306c8..b88f388d56 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -93,7 +92,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     /**
      * Scans partition inside of consistency closure.
      */
-    protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
+    protected Cursor<ReadResult> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
         return storage.runConsistently(() -> storage.scan(filter, timestamp));
     }
 
@@ -159,7 +158,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     @Test
     public void testScanOverEmpty() throws Exception {
         assertEquals(List.of(), convert(scan(row -> true, newTransactionId())));
-        assertEquals(List.of(), convert(scan(row -> true, clock.now())));
+        assertEquals(List.of(), convert0(scan(row -> true, clock.now())));
     }
 
     /**
@@ -312,13 +311,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         HybridTimestamp ts5 = clock.now();
 
         // Full scan with various timestamp values.
-        assertEquals(List.of(), convert(scan(row -> true, ts1)));
+        assertEquals(List.of(), convert0(scan(row -> true, ts1)));
 
-        assertEquals(List.of(value1), convert(scan(row -> true, ts2)));
-        assertEquals(List.of(value1), convert(scan(row -> true, ts3)));
+        assertEquals(List.of(value1), convert0(scan(row -> true, ts2)));
+        assertEquals(List.of(value1), convert0(scan(row -> true, ts3)));
 
-        assertEquals(List.of(value1, value2), convert(scan(row -> true, ts4)));
-        assertEquals(List.of(value1, value2), convert(scan(row -> true, ts5)));
+        assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts4)));
+        assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts5)));
     }
 
     @Test
@@ -364,6 +363,15 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    private List<TestValue> convert0(Cursor<ReadResult> cursor) throws Exception {
+        try (cursor) {
+            return cursor.stream()
+                    .map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow()))
+                    .sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
+                    .collect(Collectors.toList());
+        }
+    }
+
     @Test
     void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
         RowId rowId = insert(binaryRow, txId);
@@ -765,12 +773,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17720")
     void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception {
         commitAbortAndAddUncommitted();
 
-        try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) {
-            BinaryRow foundRow = cursor.next();
+        try (Cursor<ReadResult> cursor = storage.scan(k -> true, clock.now())) {
+            BinaryRow foundRow = cursor.next().binaryRow();
 
             assertRowMatches(foundRow, binaryRow3);
 
@@ -1025,6 +1032,37 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
         }
     }
 
+    @Test
+    void testScanWithWriteIntent() throws Exception {
+        RowId rowId1 = new RowId(PARTITION_ID);
+
+        HybridTimestamp commit1ts = clock.now();
+
+        storage.runConsistently(() -> {
+            addWrite(rowId1, binaryRow, newTransactionId());
+
+            commitWrite(rowId1, commit1ts);
+
+            addWrite(rowId1, binaryRow2, newTransactionId());
+
+            return null;
+        });
+
+        try (PartitionScanCursor cursor = storage.scan(r -> true, clock.now())) {
+            assertTrue(cursor.hasNext());
+
+            ReadResult next = cursor.next();
+
+            assertTrue(next.isWriteIntent());
+
+            assertRowMatches(next.binaryRow(), binaryRow2);
+
+            BinaryRow committedRow = cursor.committed(next.newestCommitTimestamp());
+
+            assertRowMatches(committedRow, binaryRow);
+        }
+    }
+
     @Test
     void testScanVersionsWithWriteIntent() throws Exception {
         RowId rowId = new RowId(PARTITION_ID, 100, 0);
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 65103d0ae9..be90f55363 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.chm;
 
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -327,14 +329,65 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
-        Iterator<BinaryRow> iterator = map.values().stream()
-                .map(versionChain -> read(versionChain, timestamp, null, filter))
-                .map(ReadResult::binaryRow)
-                .filter(Objects::nonNull)
-                .iterator();
+    public PartitionScanCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) {
+        Iterator<VersionChain> iterator = map.values().iterator();
 
-        return Cursor.fromIterator(iterator);
+        return new PartitionScanCursor() {
+
+            private VersionChain currentChain;
+
+            private ReadResult currentReadResult;
+
+            @Override
+            public BinaryRow committed(HybridTimestamp timestamp) {
+                ReadResult read = read(currentChain, timestamp, null, filter);
+
+                if (read.transactionId() == null) {
+                    return read.binaryRow();
+                }
+
+                return null;
+            }
+
+            @Override
+            public void close() {
+                // No-op.
+            }
+
+            @Override
+            public boolean hasNext() {
+                if (currentReadResult != null) {
+                    return true;
+                }
+
+                while (iterator.hasNext()) {
+                    VersionChain chain = iterator.next();
+                    ReadResult readResult = read(chain, timestamp, null, filter);
+
+                    if (!readResult.isEmpty()) {
+                        currentChain = chain;
+                        currentReadResult = readResult;
+
+                        return true;
+                    }
+                }
+
+                return false;
+            }
+
+            @Override
+            public ReadResult next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                ReadResult res = currentReadResult;
+
+                currentReadResult = null;
+
+                return res;
+            }
+        };
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 96d4843873..9cc60288cd 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -273,7 +274,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             return null;
         }
 
-        return findLatestRowVersion(versionChain, txId, MATCH_ALL).binaryRow();
+        return findLatestRowVersion(versionChain, txId, MATCH_ALL);
     }
 
     @Override
@@ -300,13 +301,13 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         }
     }
 
-    private ReadResult findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
+    private @Nullable BinaryRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) {
         RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE);
 
         ByteBufferRow row = rowVersionToBinaryRow(rowVersion);
 
         if (!keyFilter.test(row)) {
-            return ReadResult.EMPTY;
+            return null;
         }
 
         if (versionChain.isUncommitted()) {
@@ -316,11 +317,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
             throwIfChainBelongsToAnotherTx(versionChain, txId);
 
-            return ReadResult.createFromWriteIntent(row, versionChain.transactionId(), versionChain.commitTableId(), null,
-                    versionChain.commitPartitionId());
+            return row;
         }
 
-        return ReadResult.createFromCommitted(row);
+        return row;
     }
 
     private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) {
@@ -351,36 +351,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         return new ByteBufferRow(rowVersion.value());
     }
 
-    private @Nullable BinaryRow findRowVersionInChain(
-            VersionChain versionChain,
-            @Nullable UUID transactionId,
-            @Nullable HybridTimestamp timestamp,
-            Predicate<BinaryRow> keyFilter
-    ) {
-        assert transactionId != null ^ timestamp != null;
-
-        if (transactionId != null) {
-            ReadResult res = findLatestRowVersion(versionChain, transactionId, keyFilter);
-
-            if (res == null) {
-                return null;
-            }
-
-            return res.binaryRow();
-        } else {
-            ReadResult res = findRowVersionByTimestamp(versionChain, timestamp);
-
-            if (res == null) {
-                return null;
-            }
-
-            BinaryRow row = res.binaryRow();
-
-            return keyFilter.test(row) ? row : null;
-        }
-    }
-
     private ReadResult findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) {
+        assert timestamp != null;
+
         long headLink = versionChain.headLink();
 
         if (versionChain.isUncommitted()) {
@@ -660,16 +633,26 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
     @Override
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException {
-        return internalScan(keyFilter, txId, null);
+        return internalScan(keyFilter, txId);
     }
 
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
-        return internalScan(keyFilter, null, timestamp);
+    public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+        assert timestamp != null;
+
+        IgniteCursor<VersionChain> treeCursor;
+
+        try {
+            treeCursor = versionChainTree.find(null, null);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Find failed", e);
+        }
+
+        return new TimestampCursor(treeCursor, keyFilter, timestamp);
     }
 
-    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
-        assert txId != null ^ timestamp != null;
+    private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) {
+        assert txId != null;
 
         IgniteCursor<VersionChain> treeCursor;
 
@@ -679,7 +662,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             throw new StorageException("Find failed", e);
         }
 
-        return new ScanCursor(treeCursor, keyFilter, txId, timestamp);
+        return new ScanCursor(treeCursor, keyFilter, txId);
     }
 
     @Override
@@ -712,6 +695,110 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         // TODO: IGNITE-17132 Implement it
     }
 
+    private class TimestampCursor implements PartitionScanCursor {
+        private final IgniteCursor<VersionChain> treeCursor;
+
+        private final Predicate<BinaryRow> keyFilter;
+
+        private final HybridTimestamp timestamp;
+
+        private ReadResult nextRead = null;
+
+        private boolean iterationExhausted = false;
+
+        public TimestampCursor(
+                IgniteCursor<VersionChain> treeCursor,
+                Predicate<BinaryRow> keyFilter,
+                HybridTimestamp timestamp
+        ) {
+            this.treeCursor = treeCursor;
+            this.keyFilter = keyFilter;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextRead != null) {
+                return true;
+            }
+
+            if (iterationExhausted) {
+                return false;
+            }
+
+            while (true) {
+                boolean positionedToNext = tryAdvanceTreeCursor();
+
+                if (!positionedToNext) {
+                    iterationExhausted = true;
+                    return false;
+                }
+
+                VersionChain chain = getCurrentChainFromTreeCursor();
+                ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+
+                if (res.isEmpty()) {
+                    continue;
+                }
+
+                if (!keyFilter.test(res.binaryRow())) {
+                    continue;
+                }
+
+                nextRead = res;
+                return true;
+            }
+        }
+
+        private boolean tryAdvanceTreeCursor() {
+            try {
+                return treeCursor.next();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error when trying to advance tree cursor", e);
+            }
+        }
+
+        private VersionChain getCurrentChainFromTreeCursor() {
+            try {
+                return treeCursor.get();
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Failed to get element from tree cursor", e);
+            }
+        }
+
+        @Override
+        public ReadResult next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException("The cursor is exhausted");
+            }
+
+            assert nextRead != null;
+
+            ReadResult res = nextRead;
+            nextRead = null;
+
+            return res;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public BinaryRow committed(HybridTimestamp timestamp) {
+            if (iterationExhausted) {
+                throw new NoSuchElementException();
+            }
+
+            VersionChain chain = getCurrentChainFromTreeCursor();
+
+            ReadResult res = findRowVersionByTimestamp(chain, timestamp);
+
+            return res.binaryRow();
+        }
+    }
+
     private class ScanCursor implements Cursor<BinaryRow> {
         private final IgniteCursor<VersionChain> treeCursor;
 
@@ -719,8 +806,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         private final @Nullable UUID transactionId;
 
-        private final @Nullable HybridTimestamp timestamp;
-
         private BinaryRow nextRow = null;
 
         private boolean iterationExhausted = false;
@@ -728,13 +813,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         public ScanCursor(
                 IgniteCursor<VersionChain> treeCursor,
                 Predicate<BinaryRow> keyFilter,
-                @Nullable UUID transactionId,
-                @Nullable HybridTimestamp timestamp
+                @Nullable UUID transactionId
         ) {
             this.treeCursor = treeCursor;
             this.keyFilter = keyFilter;
             this.transactionId = transactionId;
-            this.timestamp = timestamp;
         }
 
         @Override
@@ -756,7 +839,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 }
 
                 VersionChain chain = getCurrentChainFromTreeCursor();
-                BinaryRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter);
+                BinaryRow row = findLatestRowVersion(chain, transactionId, keyFilter);
 
                 if (row != null) {
                     nextRow = row;
@@ -797,7 +880,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         @Override
         public void close() {
-            // no-op
+            // No-op.
         }
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 2d0d46e4be..3028d9afe2 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionScanCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -541,6 +542,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         // It is guaranteed by descending order of timestamps.
         seekIterator.seek(keyBuf.array());
 
+        return processIterator(seekIterator, rowId, timestamp, keyBuf);
+    }
+
+    private static ReadResult processIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp, ByteBuffer keyBuf) {
         // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
         // To avoid returning its value, we have to check that actual key matches what we need.
         // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations.
@@ -689,8 +694,150 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
-        return scan(keyFilter, timestamp, null);
+    public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException {
+        assert timestamp != null;
+
+        // Set next partition as an upper bound.
+        ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+
+        RocksIterator it = db.newIterator(cf, options);
+
+        // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe
+        // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the
+        // partition.
+        // Byte buffer from a thread-local field can't be used here, because of two reasons:
+        //  - no one guarantees that there will only be a single cursor;
+        //  - no one guarantees that returned cursor will not be used by other threads.
+        // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method.
+        ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(BIG_ENDIAN).putShort((short) partitionId);
+
+        return new PartitionScanCursor() {
+
+            private RowId currentRowId;
+
+            @Override
+            public BinaryRow committed(HybridTimestamp timestamp) {
+                if (currentRowId == null) {
+                    throw new IllegalStateException();
+                }
+
+                seekKeyBuf.putLong(ROW_ID_OFFSET, currentRowId.mostSignificantBits());
+                seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, currentRowId.leastSignificantBits());
+                putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp);
+
+                it.seek(seekKeyBuf.position(0));
+
+                ReadResult readResult = processIterator(it, currentRowId, timestamp, seekKeyBuf);
+
+                if (readResult.isEmpty()) {
+                    return null;
+                }
+
+                return readResult.binaryRow();
+            }
+
+            /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
+            private ReadResult next;
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean hasNext() {
+                // Fast-path for consecutive invocations.
+                if (next != null) {
+                    return true;
+                }
+
+                // Prepare direct buffer slice to read keys from the iterator.
+                ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0);
+
+                while (true) {
+                    it.seek(seekKeyBuf);
+
+                    // We should do after each seek. Here in particular it means one of two things:
+                    //  - partition is empty;
+                    //  - iterator exhausted all the data in partition.
+                    if (invalid(it)) {
+                        return false;
+                    }
+
+                    it.key(directBuffer.position(0));
+
+                    directBuffer.position(ROW_ID_OFFSET);
+                    long msb = directBuffer.getLong();
+                    long lsb = directBuffer.getLong();
+
+                    var rowId = new RowId(partitionId, msb, lsb);
+
+                    seekKeyBuf.putLong(ROW_ID_OFFSET, msb);
+                    seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+                    putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp);
+
+                    it.seek(seekKeyBuf.position(0));
+
+                    ReadResult readResult = processIterator(it, rowId, timestamp, seekKeyBuf);
+
+                    if (readResult.isEmpty()) {
+                        incrementRowId(seekKeyBuf);
+
+                        seekKeyBuf.position(0);
+                        continue;
+                    }
+
+                    next = readResult;
+                    currentRowId = rowId;
+
+                    return true;
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public ReadResult next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                ReadResult res = next;
+
+                next = null;
+
+                incrementRowId(seekKeyBuf);
+
+                seekKeyBuf.position(0);
+
+                return res;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void close() throws Exception {
+                IgniteUtils.closeAll(it, options);
+            }
+
+            private void incrementRowId(ByteBuffer buf) {
+                long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
+
+                buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
+
+                if (lsb != 0L) {
+                    return;
+                }
+
+                long msb = 1 + buf.getLong(ROW_ID_OFFSET);
+
+                buf.putLong(ROW_ID_OFFSET, msb);
+
+                if (msb != 0L) {
+                    return;
+                }
+
+                short partitionId = (short) (1 + buf.getShort(0));
+
+                assert partitionId != 0;
+
+                buf.putShort(0, partitionId);
+            }
+        };
     }
 
     private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable HybridTimestamp timestamp, @Nullable UUID txId)
@@ -698,7 +845,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
         assert timestamp == null ^ txId == null;
 
         // Set next partition as an upper bound.
-        var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+        ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
 
         RocksIterator it = db.newIterator(cf, options);
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
index 940e8047f9..aa09719b8c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.tx.TxManager;
@@ -542,7 +543,7 @@ public class VersionedRowStore {
      */
     public Cursor<BinaryRow> scan(Predicate<BinaryRow> pred) {
         // TODO <MUTED> https://issues.apache.org/jira/browse/IGNITE-17309 Transactional support for partition scans
-        Cursor<BinaryRow> delegate = storage.scan(pred, Timestamp.nextVersion());
+        Cursor<ReadResult> delegate = storage.scan(pred, Timestamp.nextVersion());
 
         // TODO asch add tx support IGNITE-15087.
         return new Cursor<BinaryRow>() {
@@ -560,7 +561,7 @@ public class VersionedRowStore {
                 }
 
                 if (delegate.hasNext()) {
-                    cur = delegate.next();
+                    cur = delegate.next().binaryRow();
 
                     return cur != null ? true : hasNext(); // Skip tombstones.
                 }