You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/10/20 14:18:06 UTC

[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17637 Implement a commit partition path write intent resolution logic for RO reads (#1197)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
     new b689d02c2c IGNITE-17637 Implement a commit partition path write intent resolution logic for RO reads (#1197)
b689d02c2c is described below

commit b689d02c2ce7cd7797dbdfadd1701b4a55df3d64
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Oct 19 12:39:51 2022 +0300

    IGNITE-17637 Implement a commit partition path write intent resolution logic for RO reads (#1197)
    
    (cherry picked from commit 7b0b3395de97db09896272e03322bba302c0b556)
---
 .../distributed/ItTxDistributedTestSingleNode.java |  22 +-
 .../internal/table/distributed/TableManager.java   |  20 +-
 .../replicator/PartitionReplicaListener.java       | 269 ++++++++++-
 .../distributed/replicator/PlacementDriver.java    | 107 +++++
 .../replication/PartitionReplicaListenerTest.java  | 497 +++++++++++++++++++++
 .../table/impl/DummyInternalTableImpl.java         |   6 +-
 .../ignite/internal/tx/message/TxMessageGroup.java |   5 +
 ...essageGroup.java => TxStateReplicaRequest.java} |  29 +-
 8 files changed, 916 insertions(+), 39 deletions(-)

diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index bda8d9fceb..f380f04e36 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.table.TxAbstractTest;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -77,6 +78,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
 import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -119,6 +121,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
 
     protected Map<ClusterNode, TxManager> txManagers;
 
+    protected Map<ClusterNode, TopologyService> topologyServices;
+
     protected Int2ObjectOpenHashMap<RaftGroupService> accRaftClients;
 
     protected Int2ObjectOpenHashMap<RaftGroupService> custRaftClients;
@@ -224,6 +228,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
         // Start raft servers. Each raft server can hold multiple groups.
         clocks = new HashMap<>(nodes);
         raftServers = new HashMap<>(nodes);
+        topologyServices = new HashMap<>(nodes);
         replicaManagers = new HashMap<>(nodes);
         replicaServices = new HashMap<>(nodes);
         txManagers = new HashMap<>(nodes);
@@ -244,6 +249,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
 
             raftServers.put(node, raftSrv);
 
+            topologyServices.put(node, cluster.get(i).topologyService());
+
             ReplicaManager replicaMgr = new ReplicaManager(
                     cluster.get(i),
                     clock,
@@ -362,6 +369,14 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
 
             for (ClusterNode node : partNodes) {
                 var testMpPartStorage = new TestMvPartitionStorage(0);
+                var txSateStorage = new TestConcurrentHashMapTxStateStorage();
+                var placementDriver = new PlacementDriver(replicaServices.get(node));
+
+                for (int part = 0; part < assignment.size(); part++) {
+                    String replicaGrpId = name + "-part-" + part;
+
+                    placementDriver.updateAssignment(replicaGrpId, assignment.get(part));
+                }
 
                 int partId = p;
 
@@ -373,7 +388,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                         () -> {
                             return new PartitionListener(
                                     testMpPartStorage,
-                                    new TestConcurrentHashMapTxStateStorage(),
+                                    txSateStorage,
                                     txManagers.get(node),
                                     primaryIndex
                             );
@@ -393,7 +408,10 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                                                 grpId,
                                                 tblId,
                                                 primaryIndex,
-                                                clocks.get(node)
+                                                clocks.get(node),
+                                                txSateStorage,
+                                                topologyServices.get(node),
+                                                placementDriver
                                         ));
                             } catch (NodeStoppingException e) {
                                 fail("Unexpected node stopping", e);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 35c33e3372..14a6cfe21a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -123,6 +123,7 @@ import org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEvent
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
@@ -220,6 +221,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Data storage manager. */
     private final DataStorageManager dataStorageMgr;
 
+    /** Placement driver. */
+    private final PlacementDriver placementDriver;
+
     /** Here a table future stores during creation (until the table can be provided to client). */
     private final Map<UUID, CompletableFuture<Table>> tableCreateFuts = new ConcurrentHashMap<>();
 
@@ -333,6 +337,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator;
         this.clock = clock;
 
+        placementDriver = new PlacementDriver(replicaSvc);
+
         netAddrResolver = addr -> {
             ClusterNode node = topologyService.getByAddress(addr);
 
@@ -679,6 +685,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 String grpId = partitionRaftGroupName(tblId, partId);
 
+                placementDriver.updateAssignment(grpId, nodes);
+
                 CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null);
 
                 ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
@@ -786,7 +794,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                                             grpId,
                                                             tblId,
                                                             primaryIndex,
-                                                            clock
+                                                            clock,
+                                                            internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
+                                                            topologyService,
+                                                            placementDriver
                                                     )
                                             );
                                         } catch (NodeStoppingException ex) {
@@ -1699,6 +1710,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
                             : ByteUtils.fromBytes(stableAssignments);
 
+                    placementDriver.updateAssignment(grpId, assignments);
+
                     ClusterNode localMember = raftMgr.topologyService().localMember();
 
                     var deltaPeers = newPeers.stream()
@@ -1762,7 +1775,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                             grpId,
                                             tblId,
                                             primaryIndex,
-                                            clock
+                                            clock,
+                                            tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
+                                            raftMgr.topologyService(),
+                                            placementDriver
                                     )
                             );
                         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 259df84a01..64c8a2079f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.hlc.HybridClock;
 import org.apache.ignite.hlc.HybridTimestamp;
@@ -66,13 +67,20 @@ import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.LockMode;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.jetbrains.annotations.NotNull;
@@ -80,6 +88,9 @@ import org.jetbrains.annotations.Nullable;
 
 /** Partition replication listener. */
 public class PartitionReplicaListener implements ReplicaListener {
+    /** Tx messages factory. */
+    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
     /** Replication group id. */
     private final String replicationGroupId;
 
@@ -117,6 +128,24 @@ public class PartitionReplicaListener implements ReplicaListener {
      */
     private final ConcurrentNavigableMap<IgniteUuid, PartitionTimestampCursor> cursors;
 
+    /** Tx state storage. */
+    private final TxStateStorage txStateStorage;
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Hybrid clock. */
+    private final HybridClock hybridClock;
+
+    /** Placement Driver. */
+    private final PlacementDriver placementDriver;
+
+    /**
+     * Map to control clock's update in the read only transactions concurrently with a commit timestamp.
+     * TODO: IGNITE-17261 review this after the commit timestamp will be provided from a commit request (request.commitTimestamp()).
+     */
+    ConcurrentHashMap<UUID, CompletableFuture<TxMeta>> txTimestampUpdateMap = new ConcurrentHashMap<>();
+
     /**
      * The constructor.
      *
@@ -124,9 +153,14 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param raftClient Raft client.
      * @param txManager Transaction manager.
      * @param lockManager Lock manager.
+     * @param partId Partition id.
+     * @param replicationGroupId replication group id.
      * @param tableId Table id.
      * @param primaryIndex Primary index.
      * @param hybridClock Hybrid clock.
+     * @param txStateStorage Transaction state storage.
+     * @param topologyService Topology services.
+     * @param placementDriver Placement driver.
      */
     public PartitionReplicaListener(
             MvPartitionStorage mvDataStorage,
@@ -137,7 +171,10 @@ public class PartitionReplicaListener implements ReplicaListener {
             String replicationGroupId,
             UUID tableId,
             ConcurrentHashMap<ByteBuffer, RowId> primaryIndex,
-            HybridClock hybridClock
+            HybridClock hybridClock,
+            TxStateStorage txStateStorage,
+            TopologyService topologyService,
+            PlacementDriver placementDriver
     ) {
         this.mvDataStorage = mvDataStorage;
         this.raftClient = raftClient;
@@ -147,6 +184,10 @@ public class PartitionReplicaListener implements ReplicaListener {
         this.replicationGroupId = replicationGroupId;
         this.tableId = tableId;
         this.primaryIndex = primaryIndex;
+        this.hybridClock = hybridClock;
+        this.txStateStorage = txStateStorage;
+        this.topologyService = topologyService;
+        this.placementDriver = placementDriver;
 
         //TODO: IGNITE-17479 Integrate indexes into replicaListener command handlers
         this.indexScanId = new UUID(tableId.getMostSignificantBits(), tableId.getLeastSignificantBits() + 1);
@@ -170,6 +211,10 @@ public class PartitionReplicaListener implements ReplicaListener {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Object> invoke(ReplicaRequest request) {
+        if (request instanceof TxStateReplicaRequest) {
+            return processTxStateReplicaRequest((TxStateReplicaRequest) request);
+        }
+
         return ensureReplicaIsPrimary(request)
                 .thenCompose((ignore) -> {
                     if (request instanceof ReadWriteSingleRowReplicaRequest) {
@@ -200,6 +245,60 @@ public class PartitionReplicaListener implements ReplicaListener {
                 });
     }
 
+    /**
+     * Processes a transaction state request.
+     *
+     * @param request Transaction state request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> processTxStateReplicaRequest(TxStateReplicaRequest request) {
+        return raftClient.refreshAndGetLeaderWithTerm()
+                .thenCompose(replicaAndTerm -> {
+                            NetworkAddress leaderAddress = replicaAndTerm.get1().address();
+
+                            if (topologyService.localMember().address().equals(leaderAddress)) {
+
+                                CompletableFuture<TxMeta> txStateFut = getTxStateConcurrently(request);
+
+                                return txStateFut.thenApply(txMeta -> new IgniteBiTuple<>(txMeta, null));
+                            } else {
+                                return CompletableFuture.completedFuture(
+                                        new IgniteBiTuple<>(null, topologyService.getByAddress(leaderAddress)));
+                            }
+                        }
+                );
+    }
+
+    /**
+     * Gets a transaction state or {@code null}, if the transaction is not completed.
+     *
+     * @param txStateReq Transaction state request.
+     * @return Future to transaction state meta or {@code null}.
+     */
+    private CompletableFuture<TxMeta> getTxStateConcurrently(TxStateReplicaRequest txStateReq) {
+        //TODO: IGNITE-17261 review this after the commit timestamp will be provided from a commit request (request.commitTimestamp()).
+        CompletableFuture<TxMeta> txStateFut = new CompletableFuture<>();
+
+        txTimestampUpdateMap.compute(txStateReq.txId(), (uuid, fut) -> {
+            if (fut != null) {
+                fut.thenAccept(txMeta -> txStateFut.complete(txMeta));
+            } else {
+                TxMeta txMeta = txStateStorage.get(txStateReq.txId());
+
+                if (txMeta == null) {
+                    // All future transactions will be committed after the resolution processed.
+                    hybridClock.update(txStateReq.commitTimestamp());
+                }
+
+                txStateFut.complete(txMeta);
+            }
+
+            return null;
+        });
+
+        return txStateFut;
+    }
+
     /**
      * Processes retrieve batch for read only transaction.
      *
@@ -209,6 +308,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) {
         UUID txId = request.transactionId();
         int batchCount = request.batchSize();
+        HybridTimestamp timestamp = request.timestamp();
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
@@ -218,7 +318,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
 
         while (batchRows.size() < batchCount && cursor.hasNext()) {
-            BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), null);
+            BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), timestamp, () -> cursor.committed(timestamp));
 
             if (resolvedReadResult != null) {
                 batchRows.add(resolvedReadResult);
@@ -239,7 +339,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
 
-        if (request.requestType() !=  RequestType.RO_GET) {
+        if (request.requestType() != RequestType.RO_GET) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
         }
@@ -247,7 +347,21 @@ public class PartitionReplicaListener implements ReplicaListener {
         //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
         RowId rowId = rowIdByKey(indexId, searchKey);
 
-        BinaryRow result = rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null;
+        ReadResult readResult = rowId == null ? null : mvDataStorage.read(rowId, request.timestamp());
+
+        BinaryRow result = readResult == null ? null : resolveReadResult(readResult, request.timestamp(), () -> {
+            if (readResult.newestCommitTimestamp() == null) {
+                return null;
+            }
+
+            ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
+
+            assert !committedReadResult.isWriteIntent() :
+                    "The result is not committed [rowId=" + rowId + ", timestamp="
+                            + readResult.newestCommitTimestamp() + ']';
+
+            return committedReadResult.binaryRow();
+        });
 
         return CompletableFuture.completedFuture(result);
     }
@@ -264,7 +378,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
 
-        if (request.requestType() !=  RequestType.RO_GET_ALL) {
+        if (request.requestType() != RequestType.RO_GET_ALL) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
         }
@@ -275,7 +389,21 @@ public class PartitionReplicaListener implements ReplicaListener {
             //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
             RowId rowId = rowIdByKey(indexId, searchKey);
 
-            result.add(rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null);
+            ReadResult readResult = rowId == null ? null : mvDataStorage.read(rowId, request.timestamp());
+
+            result.add(readResult == null ? null : resolveReadResult(readResult, request.timestamp(), () -> {
+                if (readResult.newestCommitTimestamp() == null) {
+                    return null;
+                }
+
+                ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
+
+                assert !committedReadResult.isWriteIntent() :
+                        "The result is not committed [rowId=" + rowId + ", timestamp="
+                                + readResult.newestCommitTimestamp() + ']';
+
+                return committedReadResult.binaryRow();
+            }));
         }
 
         return CompletableFuture.completedFuture(result);
@@ -390,14 +518,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         boolean commit = request.commit();
 
-        CompletableFuture<Object> changeStateFuture = raftClient.run(
-                new FinishTxCommand(
-                        txId,
-                        commit,
-                        request.commitTimestamp(),
-                        aggregatedGroupIds
-                )
-        );
+        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-17578 Cleanup process should be asynchronous.
         CompletableFuture[] cleanupFutures = new CompletableFuture[request.groups().size()];
@@ -419,6 +540,38 @@ public class PartitionReplicaListener implements ReplicaListener {
         return allOf(cleanupFutures).thenApply(ignored -> null);
     }
 
+    /**
+     * Finishes a transaction.
+     *
+     * @param aggregatedGroupIds Replication groups identifies which are enlisted in the transaction.
+     * @param txId Transaction id.
+     * @param commit True is the transaction is committed, false otherwise.
+     * @return Future to wait of the finish.
+     */
+    private CompletableFuture<Object> finishTransaction(List<String> aggregatedGroupIds, UUID txId, boolean commit) {
+        // TODO: IGNITE-17261 Timestamp from request is not using until the issue has not been fixed (request.commitTimestamp())
+        var fut = new CompletableFuture<TxMeta>();
+
+        txTimestampUpdateMap.put(txId, fut);
+
+        HybridTimestamp commitTimestamp = hybridClock.now();
+
+        CompletableFuture<Object> changeStateFuture = raftClient.run(
+                new FinishTxCommand(
+                        txId,
+                        commit,
+                        commitTimestamp,
+                        aggregatedGroupIds
+                )
+        ).whenComplete((o, throwable) -> {
+            fut.complete(new TxMeta(commit ? TxState.COMMITED : TxState.ABORTED, aggregatedGroupIds, commitTimestamp));
+
+            txTimestampUpdateMap.remove(txId);
+        });
+
+        return changeStateFuture;
+    }
+
 
     /**
      * Processes transaction cleanup request:
@@ -1155,6 +1308,29 @@ public class PartitionReplicaListener implements ReplicaListener {
         }
     }
 
+    /**
+     * Resolves a read result for RW transaction.
+     *
+     * @param readResult Read result to resolve.
+     * @param txId Transaction id.
+     * @return Resolved binary row.
+     */
+    private BinaryRow resolveReadResult(ReadResult readResult, UUID txId) {
+        return resolveReadResult(readResult, txId, null, null);
+    }
+
+    /**
+     * Resolves a read result for RO transaction.
+     *
+     * @param readResult Read result to resolve.
+     * @param timestamp Timestamp.
+     * @param lastCommitted Action to get the latest committed row.
+     * @return Resolved binary row.
+     */
+    private BinaryRow resolveReadResult(ReadResult readResult, HybridTimestamp timestamp, Supplier<BinaryRow> lastCommitted) {
+        return resolveReadResult(readResult, null, timestamp, lastCommitted);
+    }
+
     /**
      * Resolves read result to the corresponding binary row. Following rules are used for read result resolution:
      * <ol>
@@ -1167,9 +1343,16 @@ public class PartitionReplicaListener implements ReplicaListener {
      *
      * @param readResult Read result to resolve.
      * @param txId Nullable transaction id, should be provided if resolution is performed within the context of RW transaction.
+     * @param timestamp Timestamp is used in RO transaction only.
+     * @param lastCommitted Action to get the latest committed row, it is used in RO transaction only.
      * @return Resolved binary row.
      */
-    private BinaryRow resolveReadResult(ReadResult readResult, @Nullable UUID txId) {
+    private BinaryRow resolveReadResult(
+            ReadResult readResult,
+            @Nullable UUID txId,
+            @Nullable HybridTimestamp timestamp,
+            @Nullable Supplier<BinaryRow> lastCommitted
+    ) {
         if (readResult == null) {
             return null;
         } else {
@@ -1186,10 +1369,62 @@ public class PartitionReplicaListener implements ReplicaListener {
                             + " actualTxId={" + retrievedResultTxId + '}');
                 }
             } else {
+                if (!readResult.isWriteIntent()) {
+                    return readResult.binaryRow();
+                }
+
+                CompletableFuture<BinaryRow> writeIntentResolutionFut = resolveWriteIntentAsync(
+                        readResult, timestamp, lastCommitted);
+
                 // RO request.
-                // TODO: IGNITE-17637 Implement a commit partition path write intent resolution logic
-                return readResult.binaryRow();
+                return writeIntentResolutionFut.join();
             }
         }
     }
+
+    /**
+     * Resolves a read result to the matched row.
+     * If the result does not match any row, the method returns a future to {@code null}.
+     *
+     * @param readResult Read result.
+     * @param timestamp Timestamp.
+     * @param lastCommitted Action to get a last committed row.
+     * @return Result future.
+     */
+    private CompletableFuture<BinaryRow> resolveWriteIntentAsync(
+            ReadResult readResult,
+            HybridTimestamp timestamp,
+            Supplier<BinaryRow> lastCommitted
+    ) {
+        String commitGrpId = partitionRaftGroupName(readResult.commitTableId(), readResult.commitPartitionId());
+
+        return placementDriver.sendMetaRequest(commitGrpId, FACTORY.txStateReplicaRequest()
+                        .groupId(commitGrpId)
+                        .commitTimestamp(timestamp)
+                        .txId(readResult.transactionId())
+                        .build())
+                .thenApply(txMeta -> {
+                    if (txMeta == null) {
+                        return lastCommitted.get();
+                    } else if (txMeta.txState() == TxState.COMMITED && txMeta.commitTimestamp().compareTo(timestamp) <= 0) {
+                        return readResult.binaryRow();
+                    } else {
+                        assert txMeta.txState() == TxState.ABORTED : "Unexpected transaction state [state=" + txMeta.txState() + ']';
+
+                        return lastCommitted.get();
+                    }
+                });
+    }
+
+    /**
+     * Compounds a RAFT group unique name.
+     *
+     * @param tblId Table identifier.
+     * @param partition Number of table partitions.
+     * @return A RAFT group name.
+     */
+    @NotNull
+    private String partitionRaftGroupName(UUID tblId, int partition) {
+        return tblId + "_part_" + partition;
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
new file mode 100644
index 0000000000..fc2e920ce8
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Placement driver.
+ */
+public class PlacementDriver {
+    /** Assignment nodes per replication group. */
+    private final Map<String, LinkedHashSet<ClusterNode>> primaryReplicaMapping = new ConcurrentHashMap<>();
+
+    /** Replication service. */
+    private final ReplicaService replicaService;
+
+    /**
+     * The constructor.
+     *
+     * @param replicaService Replication service.
+     */
+    public PlacementDriver(ReplicaService replicaService) {
+        this.replicaService = replicaService;
+    }
+
+    /**
+     * Sends a transaction sate request to the primary replica.
+     *
+     * @param replicaGrp Replication group id.
+     * @param request Status request.
+     * @return Result future.
+     */
+    public CompletableFuture<TxMeta> sendMetaRequest(String replicaGrp, TxStateReplicaRequest request) {
+        CompletableFuture<TxMeta> resFut = new CompletableFuture<>();
+
+        sendAndRetry(resFut, replicaGrp, request);
+
+        return resFut;
+    }
+
+    /**
+     * Updates an assignment for the specific replication group.
+     *
+     * @param replicaGrpId Replication group id.
+     * @param assignment Assignment.
+     */
+    public void updateAssignment(String replicaGrpId, Collection<ClusterNode> assignment) {
+        primaryReplicaMapping.put(replicaGrpId, new LinkedHashSet<>(assignment));
+    }
+
+    /**
+     * Tries to send a request to primary replica of the replication group.
+     * If the first node turns up not a primary one the logic sends the same request to a new primary node.
+     *
+     * @param resFut Response future.
+     * @param replicaGrp Replication group id.
+     * @param request Request.
+     */
+    private void sendAndRetry(CompletableFuture<TxMeta> resFut, String replicaGrp, TxStateReplicaRequest request) {
+        ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).iterator().next();
+
+        replicaService.invoke(nodeToSend, request).thenAccept(resp -> {
+            assert resp instanceof IgniteBiTuple : "Unsupported response type [type=" + resp.getClass().getSimpleName() + ']';
+
+            IgniteBiTuple<TxMeta, ClusterNode> stateAndLeader = (IgniteBiTuple) resp;
+
+            ClusterNode nextNodeToSend = stateAndLeader.get2();
+
+            if (nextNodeToSend == null) {
+                resFut.complete(stateAndLeader.get1());
+            } else {
+                LinkedHashSet<ClusterNode> newAssignment = new LinkedHashSet<>();
+
+                newAssignment.add(nextNodeToSend);
+                newAssignment.addAll(primaryReplicaMapping.get(replicaGrp));
+
+                primaryReplicaMapping.put(replicaGrp, newAssignment);
+
+                sendAndRetry(resFut, replicaGrp, request);
+            }
+        });
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
new file mode 100644
index 0000000000..f25659b24a
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/** There are tests for partition replica listener. */
+public class PartitionReplicaListenerTest extends IgniteAbstractTest {
+    /** Tx messages factory. */
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
+
+    /** Table messages factory. */
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
+
+    /** Partition id. */
+    private static final int partId = 0;
+
+    /** Table id. */
+    private static final UUID tblId = UUID.randomUUID();
+
+    /** Replication group id. */
+    private static final String grpId = tblId + "_part_" + partId;
+
+    /** Primary index map. */
+    private static final ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
+
+    /** Hybrid clock. */
+    private static final HybridClock clock = new HybridClock();
+
+    /** The storage stores transaction states. */
+    private static final TestConcurrentHashMapTxStateStorage txStateStorage = new TestConcurrentHashMapTxStateStorage();
+
+    /** The storage stores partition data. */
+    private static final TestMvPartitionStorage testMvPartitionStorage = new TestMvPartitionStorage(partId);
+
+    /** Local cluster node. */
+    private static final ClusterNode localNode = new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127"));
+
+    /** Another (not local) cluster node. */
+    private static final ClusterNode anotherNode = new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.2:127"));
+
+    private static PlacementDriver placementDriver = mock(PlacementDriver.class);
+
+    @Mock
+    private static RaftGroupService mockRaftClient = mock(RaftGroupService.class);
+
+    @Mock
+    private static TopologyService topologySrv = mock(TopologyService.class);
+
+    /** Default reflection marshaller factory. */
+    protected static MarshallerFactory marshallerFactory;
+
+    /** Schema descriptor for tests. */
+    protected static SchemaDescriptor schemaDescriptor;
+
+    /** Key-value marshaller for tests. */
+    protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+
+    /** Partition replication listener to test. */
+    private static PartitionReplicaListener partitionReplicaListener;
+
+    /** If true the local replica is considered leader, false otherwise. */
+    private static boolean localLeader;
+
+    /** The state is used to resolve write intent. */
+    private static TxState txState;
+
+    @BeforeAll
+    private static void beforeAll() {
+        when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock -> {
+            if (!localLeader) {
+                return CompletableFuture.completedFuture(new IgniteBiTuple<>(new Peer(anotherNode.address()), 1L));
+            }
+
+            return CompletableFuture.completedFuture(new IgniteBiTuple<>(new Peer(localNode.address()), 1L));
+        });
+
+        when(topologySrv.getByAddress(any())).thenAnswer(invocationOnMock -> {
+            NetworkAddress addr = invocationOnMock.getArgument(0);
+            if (addr.equals(anotherNode.address())) {
+                return anotherNode;
+            } else if (addr.equals(localNode.address())) {
+                return localNode;
+            } else {
+                return null;
+            }
+        });
+
+        when(topologySrv.localMember()).thenReturn(localNode);
+
+        HybridTimestamp txFixedTimestamp = clock.now();
+
+        when(placementDriver.sendMetaRequest(eq(grpId), any())).thenAnswer(invocationOnMock -> {
+            TxMeta txMeta;
+
+            if (txState == null) {
+                txMeta = null;
+            } else if (txState == TxState.COMMITED) {
+                txMeta = new TxMeta(TxState.COMMITED, Collections.singletonList(grpId), txFixedTimestamp);
+            } else {
+                assert txState == TxState.ABORTED : "Sate is " + txState;
+
+                txMeta = new TxMeta(TxState.ABORTED, Collections.singletonList(grpId), txFixedTimestamp);
+            }
+            return CompletableFuture.completedFuture(txMeta);
+        });
+
+        partitionReplicaListener = new PartitionReplicaListener(
+                testMvPartitionStorage,
+                mockRaftClient,
+                mock(TxManager.class),
+                new HeapLockManager(),
+                partId,
+                grpId,
+                tblId,
+                primaryIndex,
+                clock,
+                txStateStorage,
+                topologySrv,
+                placementDriver
+        );
+
+        marshallerFactory = new ReflectionMarshallerFactory();
+
+        schemaDescriptor = new SchemaDescriptor(1, new Column[]{
+                new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
+                new Column("strKey".toUpperCase(Locale.ROOT), NativeTypes.STRING, false),
+        }, new Column[]{
+                new Column("intVal".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
+                new Column("strVal".toUpperCase(Locale.ROOT), NativeTypes.STRING, false),
+        });
+
+        kvMarshaller = marshallerFactory.create(schemaDescriptor, TestKey.class, TestValue.class);
+    }
+
+    @BeforeEach
+    private void beforeTest() {
+        localLeader = true;
+        txState = null;
+        primaryIndex.clear();
+    }
+
+    @Test
+    public void testTxStateReplicaRequestEmptyState() {
+        CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+                .groupId(grpId)
+                .commitTimestamp(clock.now())
+                .txId(Timestamp.nextVersion().toUuid())
+                .build());
+
+        IgniteBiTuple<Peer, Long> tuple = (IgniteBiTuple<Peer, Long>) fut.join();
+
+        assertNull(tuple.get1());
+        assertNull(tuple.get2());
+    }
+
+    @Test
+    public void testTxStateReplicaRequestCommitState() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+
+        txStateStorage.put(txId, new TxMeta(TxState.COMMITED, Collections.singletonList(grpId), clock.now()));
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+                .groupId(grpId)
+                .commitTimestamp(readTimestamp)
+                .txId(txId)
+                .build());
+
+        IgniteBiTuple<TxMeta, ClusterNode> tuple = (IgniteBiTuple<TxMeta, ClusterNode>) fut.join();
+
+        assertEquals(TxState.COMMITED, tuple.get1().txState());
+        assertTrue(readTimestamp.compareTo(tuple.get1().commitTimestamp()) > 0);
+        assertNull(tuple.get2());
+    }
+
+    @Test
+    public void testTxStateReplicaRequestMissLeaderMiss() {
+        localLeader = false;
+
+        CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+                .groupId(grpId)
+                .commitTimestamp(clock.now())
+                .txId(Timestamp.nextVersion().toUuid())
+                .build());
+
+        IgniteBiTuple<Peer, Long> tuple = (IgniteBiTuple<Peer, Long>) fut.join();
+
+        assertNull(tuple.get1());
+        assertNotNull(tuple.get2());
+    }
+
+    @Test
+    public void testReadOnlySingleRowReplicaRequestEmptyResult() {
+        BinaryRow testBinaryKey = nextBinaryKey();
+
+        CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+                .groupId(grpId)
+                .timestamp(clock.now())
+                .transactionId(Timestamp.nextVersion().toUuid())
+                .binaryRow(testBinaryKey)
+                .requestType(RequestType.RO_GET)
+                .build());
+
+        BinaryRow binaryRow = (BinaryRow) fut.join();
+
+        assertNull(binaryRow);
+    }
+
+    @Test
+    public void testReadOnlySingleRowReplicaRequestCommittedResult() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+        BinaryRow testBinaryKey = nextBinaryKey();
+        BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+        var rowId = new RowId(partId);
+
+        primaryIndex.put(testBinaryKey.keySlice(), rowId);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+        testMvPartitionStorage.commitWrite(rowId, clock.now());
+
+        CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+                .groupId(grpId)
+                .timestamp(clock.now())
+                .transactionId(Timestamp.nextVersion().toUuid())
+                .binaryRow(testBinaryKey)
+                .requestType(RequestType.RO_GET)
+                .build());
+
+        BinaryRow binaryRow = (BinaryRow) fut.join();
+
+        assertNotNull(binaryRow);
+    }
+
+    @Test
+    public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+        BinaryRow testBinaryKey = nextBinaryKey();
+        BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+        var rowId = new RowId(partId);
+        txState = TxState.COMMITED;
+
+        primaryIndex.put(testBinaryKey.keySlice(), rowId);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+
+        CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+                .groupId(grpId)
+                .timestamp(clock.now())
+                .transactionId(Timestamp.nextVersion().toUuid())
+                .binaryRow(testBinaryKey)
+                .requestType(RequestType.RO_GET)
+                .build());
+
+        BinaryRow binaryRow = (BinaryRow) fut.join();
+
+        assertNotNull(binaryRow);
+    }
+
+    @Test
+    public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+        BinaryRow testBinaryKey = nextBinaryKey();
+        BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+        var rowId = new RowId(partId);
+
+        primaryIndex.put(testBinaryKey.keySlice(), rowId);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+
+        CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+                .groupId(grpId)
+                .timestamp(clock.now())
+                .transactionId(Timestamp.nextVersion().toUuid())
+                .binaryRow(testBinaryKey)
+                .requestType(RequestType.RO_GET)
+                .build());
+
+        BinaryRow binaryRow = (BinaryRow) fut.join();
+
+        assertNull(binaryRow);
+    }
+
+    @Test
+    public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() {
+        UUID txId = Timestamp.nextVersion().toUuid();
+        BinaryRow testBinaryKey = nextBinaryKey();
+        BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+        var rowId = new RowId(partId);
+        txState = TxState.ABORTED;
+
+        primaryIndex.put(testBinaryKey.keySlice(), rowId);
+        testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+
+        CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+                .groupId(grpId)
+                .timestamp(clock.now())
+                .transactionId(Timestamp.nextVersion().toUuid())
+                .binaryRow(testBinaryKey)
+                .requestType(RequestType.RO_GET)
+                .build());
+
+        BinaryRow binaryRow = (BinaryRow) fut.join();
+
+        assertNull(binaryRow);
+    }
+
+    protected static BinaryRow nextBinaryKey() {
+        try {
+            int nextInt = (int) System.nanoTime();
+
+            return kvMarshaller.marshal(new TestKey(nextInt, "key " + nextInt));
+        } catch (MarshallerException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    protected static BinaryRow binaryRow(TestKey key, TestValue value) {
+        try {
+            return kvMarshaller.marshal(key, value);
+        } catch (MarshallerException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    protected static TestKey key(BinaryRow binaryRow) {
+        try {
+            return kvMarshaller.unmarshalKey(new Row(schemaDescriptor, binaryRow));
+        } catch (MarshallerException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    protected static TestValue value(BinaryRow binaryRow) {
+        try {
+            return kvMarshaller.unmarshalValue(new Row(schemaDescriptor, binaryRow));
+        } catch (MarshallerException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+
+    /**
+     * Test pojo key.
+     */
+    protected static class TestKey {
+        @IgniteToStringInclude
+        public int intKey;
+
+        @IgniteToStringInclude
+        public String strKey;
+
+        public TestKey() {
+        }
+
+        public TestKey(int intKey, String strKey) {
+            this.intKey = intKey;
+            this.strKey = strKey;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestKey testKey = (TestKey) o;
+            return intKey == testKey.intKey && Objects.equals(strKey, testKey.strKey);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(intKey, strKey);
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+    }
+
+    /**
+     * Test pojo value.
+     */
+    protected static class TestValue implements Comparable<TestValue> {
+        @IgniteToStringInclude
+        public Integer intVal;
+
+        @IgniteToStringInclude
+        public String strVal;
+
+        public TestValue() {
+        }
+
+        public TestValue(Integer intVal, String strVal) {
+            this.intVal = intVal;
+            this.strVal = strVal;
+        }
+
+        @Override
+        public int compareTo(TestValue o) {
+            int cmp = Integer.compare(intVal, o.intVal);
+
+            return cmp != 0 ? cmp : strVal.compareTo(o.strVal);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestValue testValue = (TestValue) o;
+            return Objects.equals(intVal, testValue.intVal) && Objects.equals(strVal, testValue.strVal);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(intVal, strVal);
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index c33bd9c3f2..9869656427 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -209,7 +209,11 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 groupId,
                 tableId(),
                 primaryIndex,
-                new HybridClock()
+                new HybridClock(),
+                null,
+                null,
+                null
+
         );
 
         partitionListener = new PartitionListener(
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index cb920878a6..c9ab6bedec 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -38,4 +38,9 @@ public class TxMessageGroup {
      * Message type for {@link TxCleanupReplicaRequest}.
      */
     public static final short TX_CLEANUP_REQUEST = 2;
+
+    /**
+     * Message type for {@link TxStateReplicaRequest}.
+     */
+    public static final short TX_STATE_REQUEST = 3;
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
similarity index 60%
copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
copy to modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
index cb920878a6..43a8466cb3 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
@@ -17,25 +17,20 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import org.apache.ignite.network.annotations.MessageGroup;
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Message types for transactions.
+ * Transaction state request.
  */
-@MessageGroup(groupType = 5, groupName = "TxMessages")
-public class TxMessageGroup {
-    /**
-     * Message type for {@link TxFinishReplicaRequest}.
-     */
-    public static final short TX_FINISH_REQUEST = 0;
+@Transferable(TxMessageGroup.TX_STATE_REQUEST)
+public interface TxStateReplicaRequest extends ReplicaRequest {
+    @Marshallable
+    UUID txId();
 
-    /**
-     * Message type for {@link TxFinishResponse}.
-     */
-    public static final short TX_FINISH_RESPONSE = 1;
-
-    /**
-     * Message type for {@link TxCleanupReplicaRequest}.
-     */
-    public static final short TX_CLEANUP_REQUEST = 2;
+    @Marshallable
+    HybridTimestamp commitTimestamp();
 }