You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/02/09 17:47:38 UTC

[ignite-3] branch main updated: IGNITE-18497 Fixed read only get returns a first one value getting from primary index (#1624)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f834c5c7e IGNITE-18497 Fixed read only get returns a first one value getting from primary index (#1624)
5f834c5c7e is described below

commit 5f834c5c7ef87fece68f6af4d5f1329aa77fc19b
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Thu Feb 9 19:47:31 2023 +0200

    IGNITE-18497 Fixed read only get returns a first one value getting from primary index (#1624)
---
 .../apache/ignite/internal/util/IgniteUtils.java   |  52 +++++
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |   8 +-
 .../replicator/PartitionReplicaListener.java       | 139 ++++++++++--
 .../replication/PartitionReplicaListenerTest.java  | 251 ++++++++++++++++++---
 .../internal/tx/message/TxStateReplicaRequest.java |   2 +-
 5 files changed, 395 insertions(+), 57 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 9d7a6430c9..9bdc8f64b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -37,10 +37,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -944,4 +946,54 @@ public class IgniteUtils {
     public static ObjectName makeMbeanName(String group, String name) throws MalformedObjectNameException {
         return new ObjectName(String.format("%s:group=%s,name=%s", JMX_MBEAN_PACKAGE, group, name));
     }
+
+    /**
+     * Filter the collection using the given predicate.
+     *
+     * @param collection Collection.
+     * @param predicate Predicate.
+     * @return Filtered list.
+     */
+    public static <T> List<T> filter(Collection<T> collection, Predicate<T> predicate) {
+        List<T> result = new ArrayList<>();
+
+        for (T e : collection) {
+            if (predicate.test(e)) {
+                result.add(e);
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Find any element in given collection.
+     *
+     * @param collection Collection.
+     * @return Optional containing element (if present).
+     */
+    public static <T> Optional<T> findAny(Collection<T> collection) {
+        return findAny(collection, null);
+    }
+
+    /**
+     * Find any element in given collection for which the predicate returns {@code true}.
+     *
+     * @param collection Collection.
+     * @param predicate Predicate.
+     * @return Optional containing element (if present).
+     */
+    public static <T> Optional<T> findAny(Collection<T> collection, @Nullable Predicate<T> predicate) {
+        if (!collection.isEmpty()) {
+            for (Iterator<T> it = collection.iterator(); it.hasNext(); ) {
+                T t = it.next();
+
+                if (predicate == null || predicate.test(t)) {
+                    return Optional.ofNullable(t);
+                }
+            }
+        }
+
+        return Optional.empty();
+    }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index dd208ea7f2..e2cdf1a131 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -274,10 +275,9 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends AbstractBasicIntegra
 
                 BinaryRow readOnlyRow = table.internalTable().get(testKey, new HybridClockImpl().now(), ignite.node()).get();
 
-                //TODO: IGNITE-18497 Readonly check is possible only when the readonly read will be fixed.
-                //assertNotNull(readOnlyRow);
-                //assertEquals(row[1], new Row(table.schemaView().schema(), readOnlyRow).stringValue(2));
-                //assertEquals(row[2], new Row(table.schemaView().schema(), readOnlyRow).doubleValue(1));
+                assertNotNull(readOnlyRow);
+                assertEquals(row[1], new Row(table.schemaView().schema(), readOnlyRow).stringValue(2));
+                assertEquals(row[2], new Row(table.schemaView().schema(), readOnlyRow).doubleValue(1));
             } catch (Exception e) {
                 new RuntimeException(IgniteStringFormatter.format("Cannot check a row {}", row), e);
             }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 7c5496fe88..9c0954b0e8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,6 +22,8 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.IgniteUtils.filter;
+import static org.apache.ignite.internal.util.IgniteUtils.findAny;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
@@ -351,7 +353,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
                 if (txMeta == null) {
                     // All future transactions will be committed after the resolution processed.
-                    hybridClock.update(txStateReq.commitTimestamp());
+                    hybridClock.update(txStateReq.readTimestamp());
                 }
 
                 txStateFut.complete(txMeta);
@@ -474,7 +476,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
 
-        return safeReadFuture.thenCompose(unused -> resolveRowByPk(searchRow, readTimestamp));
+        return safeReadFuture.thenCompose(unused -> resolveRowByPkForReadOnly(searchRow, readTimestamp));
     }
 
     /**
@@ -502,7 +504,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new ArrayList<>(searchRows.size());
 
             for (BinaryRow searchRow : searchRows) {
-                CompletableFuture<BinaryRow> fut = resolveRowByPk(searchRow, readTimestamp);
+                CompletableFuture<BinaryRow> fut = resolveRowByPkForReadOnly(searchRow, readTimestamp);
 
                 resolutionFuts.add(fut);
             }
@@ -1136,35 +1138,94 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param ts A timestamp regarding which we need to resolve the given row.
      * @return Result of the given action.
      */
-    private CompletableFuture<BinaryRow> resolveRowByPk(BinaryRow searchKey, HybridTimestamp ts) {
+    private CompletableFuture<BinaryRow> resolveRowByPkForReadOnly(BinaryRow searchKey, HybridTimestamp ts) {
         try (Cursor<RowId> cursor = pkIndexStorage.get().get(searchKey)) {
+            List<ReadResult> candidates = new ArrayList<>();
+
             for (RowId rowId : cursor) {
                 ReadResult readResult = mvDataStorage.read(rowId, ts);
 
-                return resolveReadResult(readResult, ts, () -> {
-                    HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
+                if (!readResult.isEmpty() || readResult.isWriteIntent()) {
+                    candidates.add(readResult);
+                }
+            }
 
-                    if (newestCommitTimestamp == null) {
-                        return null;
-                    }
+            if (candidates.isEmpty()) {
+                return completedFuture(null);
+            }
 
-                    ReadResult committedReadResult = mvDataStorage.read(rowId, newestCommitTimestamp);
+            // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of multiple write intents should not be needed
+            List<ReadResult> writeIntents = filter(candidates, ReadResult::isWriteIntent);
 
-                    assert !committedReadResult.isWriteIntent() :
-                            "The result is not committed [rowId=" + rowId + ", timestamp="
-                                    + newestCommitTimestamp + ']';
+            if (!writeIntents.isEmpty()) {
+                ReadResult writeIntent = writeIntents.get(0);
 
-                    return committedReadResult.binaryRow();
-                });
-            }
+                // Assume that all write intents for the same key belong to the same transaction, as the key should be exclusively locked.
+                // This means that we can just resolve the state of this transaction.
+                checkWriteIntentsBelongSameTx(writeIntents);
 
-            return completedFuture(null);
+                return resolveTxState(
+                                new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()),
+                                writeIntent.transactionId(),
+                                ts)
+                        .thenApply(readLastCommitted -> {
+                            if (readLastCommitted) {
+                                for (ReadResult wi : writeIntents) {
+                                    HybridTimestamp newestCommitTimestamp = wi.newestCommitTimestamp();
+
+                                    if (newestCommitTimestamp == null) {
+                                        continue;
+                                    }
+
+                                    ReadResult committedReadResult = mvDataStorage.read(wi.rowId(), newestCommitTimestamp);
+
+                                    assert !committedReadResult.isWriteIntent() :
+                                            "The result is not committed [rowId=" + wi.rowId() + ", timestamp="
+                                                    + newestCommitTimestamp + ']';
+
+                                    return committedReadResult.binaryRow();
+                                }
+
+                                return findAny(candidates, c -> !c.isWriteIntent() && !c.isEmpty()).map(ReadResult::binaryRow)
+                                        .orElse(null);
+                            } else {
+                                return findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow)
+                                        .orElse(null);
+                            }
+                        });
+            } else {
+                BinaryRow result = findAny(candidates, r -> !r.isEmpty()).map(ReadResult::binaryRow)
+                        .orElse(null);
+
+                return completedFuture(result);
+            }
         } catch (Exception e) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     format("Unable to close cursor [tableId={}]", tableId), e);
         }
     }
 
+    /**
+     * Check that all given write intents belong to the same transaction.
+     *
+     * @param writeIntents Write intents.
+     */
+    private static void checkWriteIntentsBelongSameTx(Collection<ReadResult> writeIntents) {
+        ReadResult writeIntent = findAny(writeIntents).orElseThrow();
+
+        for (ReadResult wi : writeIntents) {
+            assert wi.transactionId().equals(writeIntent.transactionId())
+                    : "Unexpected write intent, tx1=" + writeIntent.transactionId() + ", tx2=" + wi.transactionId();
+
+            assert wi.commitTableId().equals(writeIntent.commitTableId())
+                    : "Unexpected write intent, commitTableId1=" + writeIntent.commitTableId() + ", commitTableId2=" + wi.commitTableId();
+
+            assert wi.commitPartitionId() == writeIntent.commitPartitionId()
+                    : "Unexpected write intent, commitPartitionId1=" + writeIntent.commitPartitionId()
+                    + ", commitPartitionId2=" + wi.commitPartitionId();
+        }
+    }
+
     /**
      * Tests row values for equality.
      *
@@ -1940,7 +2001,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             @Nullable Supplier<BinaryRow> lastCommitted
     ) {
         if (readResult == null) {
-            return null;
+            return completedFuture(null);
         } else {
             if (txId != null) {
                 // RW request.
@@ -1979,22 +2040,50 @@ public class PartitionReplicaListener implements ReplicaListener {
             HybridTimestamp timestamp,
             Supplier<BinaryRow> lastCommitted
     ) {
-        ReplicationGroupId commitGrpId = new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId());
+        return resolveTxState(
+                        new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()),
+                        readResult.transactionId(),
+                        timestamp)
+                .thenApply(readLastCommitted -> {
+                    if (readLastCommitted) {
+                        return lastCommitted.get();
+                    } else {
+                        return readResult.binaryRow();
+                    }
+                });
+    }
 
+    /**
+     * Resolve the actual tx state.
+     *
+     * @param commitGrpId Commit partition id.
+     * @param txId Transaction id.
+     * @param timestamp Timestamp.
+     * @return Future with boolean value, indicating whether the transaction was committed before timestamp.
+     */
+    private CompletableFuture<Boolean> resolveTxState(
+            ReplicationGroupId commitGrpId,
+            UUID txId,
+            HybridTimestamp timestamp
+    ) {
         return placementDriver.sendMetaRequest(commitGrpId, FACTORY.txStateReplicaRequest()
                         .groupId(commitGrpId)
-                        .commitTimestamp(timestamp)
-                        .txId(readResult.transactionId())
+                        .readTimestamp(timestamp)
+                        .txId(txId)
                         .build())
                 .thenApply(txMeta -> {
                     if (txMeta == null) {
-                        return lastCommitted.get();
-                    } else if (txMeta.txState() == TxState.COMMITED && txMeta.commitTimestamp().compareTo(timestamp) <= 0) {
-                        return readResult.binaryRow();
+                        return true;
+                    } else if (txMeta.txState() == TxState.COMMITED) {
+                        if (txMeta.commitTimestamp().compareTo(timestamp) <= 0) {
+                            return false;
+                        } else {
+                            return true;
+                        }
                     } else {
                         assert txMeta.txState() == TxState.ABORTED : "Unexpected transaction state [state=" + txMeta.txState() + ']';
 
-                        return lastCommitted.get();
+                        return true;
                     }
                 });
     }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 5dcc07baf4..5997e3be94 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -22,13 +22,13 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -37,12 +37,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -87,6 +90,9 @@ import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
@@ -121,7 +127,45 @@ import org.mockito.Mock;
 
 /** There are tests for partition replica listener. */
 public class PartitionReplicaListenerTest extends IgniteAbstractTest {
-    private static final Supplier<CompletableFuture<?>> DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER = () -> completedFuture(null);
+    /** Partition id. */
+    private static final int partId = 0;
+
+    /** Table id. */
+    private static final UUID tblId = UUID.randomUUID();
+
+    private static final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>();
+
+    /** The storage stores partition data. */
+    private static final TestMvPartitionStorage testMvPartitionStorage = new TestMvPartitionStorage(partId);
+
+    private static LockManager lockManager = new HeapLockManager();
+
+    private static final Function<PartitionCommand, CompletableFuture<?>> DEFAULT_MOCK_RAFT_FUTURE_CLOSURE = cmd -> {
+        if (cmd instanceof TxCleanupCommand) {
+            Set<RowId> rows = pendingRows.remove(cmd.txId());
+
+            if (rows != null) {
+                for (RowId row : rows) {
+                    testMvPartitionStorage.commitWrite(row, ((TxCleanupCommand) cmd).commitTimestamp().asHybridTimestamp());
+                }
+            }
+
+            lockManager.locks(cmd.txId()).forEachRemaining(lock -> lockManager.release(lock));
+        } else if (cmd instanceof UpdateCommand) {
+            pendingRows.compute(cmd.txId(), (txId, v) -> {
+                if (v == null) {
+                    v = new HashSet<>();
+                }
+
+                RowId rowId = new RowId(partId, ((UpdateCommand) cmd).rowUuid());
+                v.add(rowId);
+
+                return v;
+            });
+        }
+
+        return completedFuture(null);
+    };
 
     /** Tx messages factory. */
     private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
@@ -129,12 +173,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     /** Table messages factory. */
     private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
 
-    /** Partition id. */
-    private static final int partId = 0;
-
-    /** Table id. */
-    private static final UUID tblId = UUID.randomUUID();
-
     /** Replication group id. */
     private static final ReplicationGroupId grpId = new TablePartitionId(tblId, partId);
 
@@ -144,16 +182,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     /** The storage stores transaction states. */
     private static final TestTxStateStorage txStateStorage = new TestTxStateStorage();
 
-    /** The storage stores partition data. */
-    private static final TestMvPartitionStorage testMvPartitionStorage = new TestMvPartitionStorage(partId);
-
     /** Local cluster node. */
     private static final ClusterNode localNode = new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127"));
 
     /** Another (not local) cluster node. */
     private static final ClusterNode anotherNode = new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.2:127"));
 
-    private static PlacementDriver placementDriver = mock(PlacementDriver.class);
+    private static final PlacementDriver placementDriver = mock(PlacementDriver.class);
 
     private static PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(testMvPartitionStorage);
 
@@ -192,9 +227,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     /** Secondary hash index. */
     private static TableSchemaAwareIndexStorage hashIndexStorage;
 
-    private static Supplier<CompletableFuture<?>> raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
-
-    private static LockManager lockManager = new HeapLockManager();
+    private static Function<PartitionCommand, CompletableFuture<?>> raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
 
     @BeforeAll
     private static void beforeAll() {
@@ -206,7 +239,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             return completedFuture(new LeaderWithTerm(new Peer(localNode.name()), 1L));
         });
 
-        when(mockRaftClient.run(any())).thenAnswer(invocationOnMock -> raftClientFutureSupplier.get());
+        when(mockRaftClient.run(any())).thenAnswer(invocationOnMock -> raftClientFutureClosure.apply(invocationOnMock.getArgument(0)));
 
         when(topologySrv.getByConsistentId(any())).thenAnswer(invocationOnMock -> {
             String consistentId = invocationOnMock.getArgument(0);
@@ -223,7 +256,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
         HybridTimestamp txFixedTimestamp = clock.now();
 
-        when(placementDriver.sendMetaRequest(eq(grpId), any())).thenAnswer(invocationOnMock -> {
+        when(placementDriver.sendMetaRequest(any(), any())).thenAnswer(invocationOnMock -> {
             TxMeta txMeta;
 
             if (txState == null) {
@@ -319,13 +352,15 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         ((TestHashIndexStorage) hashIndexStorage.storage()).clear();
         ((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
         testMvPartitionStorage.clear();
+        pendingRows.clear();
+        //lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
     }
 
     @Test
     public void testTxStateReplicaRequestEmptyState() throws Exception {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
                 .groupId(grpId)
-                .commitTimestamp(clock.now())
+                .readTimestamp(clock.now())
                 .txId(Timestamp.nextVersion().toUuid())
                 .build());
 
@@ -345,7 +380,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
                 .groupId(grpId)
-                .commitTimestamp(readTimestamp)
+                .readTimestamp(readTimestamp)
                 .txId(txId)
                 .build());
 
@@ -362,7 +397,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
                 .groupId(grpId)
-                .commitTimestamp(clock.now())
+                .readTimestamp(clock.now())
                 .txId(Timestamp.nextVersion().toUuid())
                 .build());
 
@@ -807,7 +842,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE_EXACT);
         checkRowInMvStorage(binaryRow(0), false);
 
-        lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+        cleanup(txId);
     }
 
     @Test
@@ -843,7 +878,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         checkRowInMvStorage(row0, false);
         checkRowInMvStorage(row1, false);
 
-        lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+        cleanup(txId);
     }
 
     private void doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
@@ -889,7 +924,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 () -> checkRowInMvStorage(binaryRow(0), true)
         );
 
-        lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+        cleanup(txId);
     }
 
     @Test
@@ -915,7 +950,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 () -> checkRowInMvStorage(binaryRow(0), true)
         );
 
-        lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+        cleanup(txId);
     }
 
     private void checkRowInMvStorage(BinaryRow binaryRow, boolean shouldBePresent) {
@@ -956,14 +991,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Check that cleanup request processing awaits all write requests.
         CompletableFuture<?> writeFut = new CompletableFuture<>();
 
-        raftClientFutureSupplier = () -> writeFut;
+        raftClientFutureClosure = cmd -> writeFut;
 
         try {
             CompletableFuture<?> replicaWriteFut = partitionReplicaListener.invoke(updatingRequestSupplier.get());
 
             assertFalse(replicaWriteFut.isDone());
 
-            raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+            raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
 
             HybridTimestamp now = clock.now();
 
@@ -984,7 +1019,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
             assertThat(replicaCleanupFut, willSucceedFast());
         } finally {
-            raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+            raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
         }
 
         // Check that one more write after cleanup is discarded.
@@ -992,6 +1027,168 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         assertThat(writeAfterCleanupFuture, willFailFast(TransactionException.class));
     }
 
+    @Test
+    public void testReadOnlyGetAfterRowRewrite() {
+        testReadOnlyGetAfterRowRewrite0(true,  true,  true,  false);
+        testReadOnlyGetAfterRowRewrite0(true,  true,  false, false);
+        testReadOnlyGetAfterRowRewrite0(true,  false, true,  false);
+        testReadOnlyGetAfterRowRewrite0(true,  false, false, false);
+        testReadOnlyGetAfterRowRewrite0(false, true,  true,  false);
+        testReadOnlyGetAfterRowRewrite0(false, true,  false, false);
+        testReadOnlyGetAfterRowRewrite0(false, false, true,  false);
+        testReadOnlyGetAfterRowRewrite0(false, false, false, false);
+    }
+
+    @Test
+    public void testReadOnlyGetAllAfterRowRewrite() {
+        testReadOnlyGetAfterRowRewrite0(true,  true,  true,  true);
+        testReadOnlyGetAfterRowRewrite0(true,  true,  false, true);
+        testReadOnlyGetAfterRowRewrite0(true,  false, true,  true);
+        testReadOnlyGetAfterRowRewrite0(true,  false, false, true);
+        testReadOnlyGetAfterRowRewrite0(false, true,  true,  true);
+        testReadOnlyGetAfterRowRewrite0(false, true,  false, true);
+        testReadOnlyGetAfterRowRewrite0(false, false, true,  true);
+        testReadOnlyGetAfterRowRewrite0(false, false, false, true);
+    }
+
+    /**
+     * Puts several records into the storage, optionally leaving them as write intents, alternately deleting and upserting the same row
+     * within the same RW transaction, then checking read correctness via read only request.
+     *
+     * @param insertFirst Whether to insert some values before RW transaction.
+     * @param upsertAfterDelete Whether to insert value after delete in RW transaction, so that it would present as non-null write intent.
+     * @param committed Whether to commit RW transaction before doing RO request.
+     * @param multiple Whether to check multiple rows via getAll request.
+     */
+    public void testReadOnlyGetAfterRowRewrite0(boolean insertFirst, boolean upsertAfterDelete, boolean committed, boolean multiple) {
+        beforeTest();
+
+        BinaryRow br1 = binaryRow(1);
+        BinaryRow br2 = binaryRow(2);
+
+        if (insertFirst) {
+            UUID tx0 = beginTx();
+            upsert(tx0, br1);
+            upsert(tx0, br2);
+            cleanup(tx0);
+        }
+
+        txState = null;
+
+        UUID tx1 = beginTx();
+        delete(tx1, br1);
+        upsert(tx1, br1);
+
+        while (true) {
+            delete(tx1, br1);
+
+            if (upsertAfterDelete) {
+                upsert(tx1, br1);
+            }
+
+            Cursor<RowId> cursor = pkStorage.get().get(br1);
+
+            RowId rowId = cursor.next();
+
+            BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
+
+            if (row == null) {
+                break;
+            }
+        }
+
+        if (committed) {
+            cleanup(tx1);
+        }
+
+        if (multiple) {
+            Set<BinaryRow> allRows = insertFirst ? Set.of(br1, br2) : Set.of(br1);
+            Set<BinaryRow> allRowsButModified = insertFirst ? Set.of(br2) : Set.of();
+            Set<BinaryRow> expected = committed
+                    ? (upsertAfterDelete ? allRows : allRowsButModified)
+                    : (insertFirst ? allRows : Set.of());
+            Set<BinaryRow> res = new HashSet<>(roGetAll(allRows, clock.now()));
+
+            assertEquals(expected.size(), res.size());
+            for (BinaryRow e : expected) {
+                res.contains(e);
+            }
+        } else {
+            BinaryRow res = roGet(br1, clock.now());
+            BinaryRow expected = committed
+                    ? (upsertAfterDelete ? br1 : null)
+                    : (insertFirst ? br1 : null);
+
+            if (expected == null) {
+                assertNull(res);
+            } else {
+                assertArrayEquals(expected.bytes(), res.bytes());
+            }
+        }
+
+        cleanup(tx1);
+    }
+
+    private UUID beginTx() {
+        return Timestamp.nextVersion().toUuid();
+    }
+
+    private void upsert(UUID txId, BinaryRow row) {
+        partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+                .requestType(RequestType.RW_UPSERT)
+                .transactionId(txId)
+                .binaryRow(row)
+                .term(1L)
+                .commitPartitionId(new TablePartitionId(tblId, partId))
+                .build()
+        );
+    }
+
+    private void delete(UUID txId, BinaryRow row) {
+        partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+                .requestType(RequestType.RW_DELETE)
+                .transactionId(txId)
+                .binaryRow(row)
+                .term(1L)
+                .commitPartitionId(new TablePartitionId(tblId, partId))
+                .build()
+        );
+    }
+
+    private BinaryRow roGet(BinaryRow row, HybridTimestamp readTimestamp) {
+        CompletableFuture<Object> future = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+                .requestType(RequestType.RO_GET)
+                .readTimestamp(readTimestamp)
+                .binaryRow(row)
+                .build()
+        );
+
+        return (BinaryRow) future.join();
+    }
+
+    private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
+        CompletableFuture<Object> future = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyMultiRowReplicaRequest()
+                .requestType(RequestType.RO_GET_ALL)
+                .readTimestamp(readTimestamp)
+                .binaryRows(rows)
+                .build()
+        );
+
+        return (List<BinaryRow>) future.join();
+    }
+
+    private void cleanup(UUID txId) {
+        partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
+                .txId(txId)
+                .commit(true)
+                .commitTimestamp(clock.now())
+                .term(1L)
+                .build()
+        );
+
+        txState = TxState.COMMITED;
+    }
+
     private static BinaryTuplePrefix toIndexBound(int val) {
         ByteBuffer tuple = new BinaryTuplePrefixBuilder(1, 1).appendInt(val).build();
 
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
index 983b9133e9..d5bb7aa560 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
@@ -32,5 +32,5 @@ public interface TxStateReplicaRequest extends ReplicaRequest {
     UUID txId();
 
     @Marshallable
-    HybridTimestamp commitTimestamp();
+    HybridTimestamp readTimestamp();
 }