You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "denis-chudov (via GitHub)" <gi...@apache.org> on 2023/01/31 22:55:04 UTC

[GitHub] [ignite-3] denis-chudov opened a new pull request, #1612: IGNITE-17817 Update ItTablePersistenceTest to use Replica layer with new transaction protocol

denis-chudov opened a new pull request, #1612:
URL: https://github.com/apache/ignite-3/pull/1612

   https://issues.apache.org/jira/browse/IGNITE-17817


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] asfgit closed pull request #1612: IGNITE-17817 Update ItTablePersistenceTest to use Replica layer with new transaction protocol

Posted by "asfgit (via GitHub)" <gi...@apache.org>.
asfgit closed pull request #1612: IGNITE-17817 Update ItTablePersistenceTest to use Replica layer with new transaction protocol
URL: https://github.com/apache/ignite-3/pull/1612


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1612: IGNITE-17817 Update ItTablePersistenceTest to use Replica layer with new transaction protocol

Posted by "alievmirza (via GitHub)" <gi...@apache.org>.
alievmirza commented on code in PR #1612:
URL: https://github.com/apache/ignite-3/pull/1612#discussion_r1094106943


##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(), any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestTxStateTableStorage(),
                 replicaService,
-                mock(HybridClock.class)
+                hybridClock
         );
 
+        closeables.add(() -> table.close());
+
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+    private PartitionReplicaListener mockPartitionReplicaListener(RaftGroupService service) {
+        PartitionReplicaListener partitionReplicaListener = mock(PartitionReplicaListener.class);
+
+        when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+            ReplicaRequest req = invocationOnMock.getArgument(0);
+
+            if (req instanceof ReadWriteSingleRowReplicaRequest) {
+                ReadWriteSingleRowReplicaRequest req0 = (ReadWriteSingleRowReplicaRequest) req;
+
+                if (req0.requestType() == RequestType.RW_GET) {
+                    int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+                    MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex);
+
+                    Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(partitionStorage);
+                    RowId rowId = primaryIndex.get(req0.binaryRow().keySlice());
+                    BinaryRow row = rowConverter.fromTuple(partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).tableRow().tupleSlice());
+
+                    return completedFuture(row);
+                }
+
+                // Non-null binary row if UPSERT, otherwise it's implied that request type is DELETE.
+                BinaryRow binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRow() : null;
+                TableRow tableRow = binaryRow == null ? null : TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+
+                UpdateCommand cmd = msgFactory.updateCommand()
+                        .txId(req0.transactionId())
+                        .tablePartitionId(tablePartitionId(new TablePartitionId(UUID.randomUUID(), 0)))
+                        .rowUuid(new RowId(0).uuid())
+                        .rowBuffer(tableRow == null ? null : tableRow.byteBuffer())
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd);
+            } else if (req instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+                FinishTxCommand cmd = msgFactory.finishTxCommand()
+                        .txId(req0.txId())
+                        .commit(req0.commit())
+                        .commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                        .tablePartitionIds(asList(tablePartitionId(new TablePartitionId(UUID.randomUUID(), 0))))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd)
+                        .thenCompose(ignored -> {
+                            TxCleanupCommand cleanupCmd = msgFactory.txCleanupCommand()
+                                    .txId(req0.txId())
+                                    .commit(req0.commit())
+                                    .commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                                    .safeTime(hybridTimestamp(hybridClock.now()))
+                                    .build();
+
+                            return service.run(cleanupCmd);
+                        });
+            }
 
-        managers.add(txManager);
+            throw new AssertionError("Unexpected request: " + req);
+        });
 
-        txManager.start();
+        return partitionReplicaListener;
+    }
 
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
+    /**
+     * Method to convert from {@link HybridTimestamp} object to NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {

Review Comment:
   let not use abbreviation here 



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(), any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {

Review Comment:
   We definitely should take the number of nodes from the test configuration. 



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(), any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestTxStateTableStorage(),
                 replicaService,
-                mock(HybridClock.class)
+                hybridClock
         );
 
+        closeables.add(() -> table.close());
+
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+    private PartitionReplicaListener mockPartitionReplicaListener(RaftGroupService service) {
+        PartitionReplicaListener partitionReplicaListener = mock(PartitionReplicaListener.class);
+
+        when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+            ReplicaRequest req = invocationOnMock.getArgument(0);
+
+            if (req instanceof ReadWriteSingleRowReplicaRequest) {
+                ReadWriteSingleRowReplicaRequest req0 = (ReadWriteSingleRowReplicaRequest) req;
+
+                if (req0.requestType() == RequestType.RW_GET) {
+                    int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+                    MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex);
+
+                    Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(partitionStorage);
+                    RowId rowId = primaryIndex.get(req0.binaryRow().keySlice());
+                    BinaryRow row = rowConverter.fromTuple(partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).tableRow().tupleSlice());
+
+                    return completedFuture(row);
+                }
+
+                // Non-null binary row if UPSERT, otherwise it's implied that request type is DELETE.
+                BinaryRow binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRow() : null;
+                TableRow tableRow = binaryRow == null ? null : TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+
+                UpdateCommand cmd = msgFactory.updateCommand()
+                        .txId(req0.transactionId())
+                        .tablePartitionId(tablePartitionId(new TablePartitionId(UUID.randomUUID(), 0)))
+                        .rowUuid(new RowId(0).uuid())
+                        .rowBuffer(tableRow == null ? null : tableRow.byteBuffer())
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd);
+            } else if (req instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+                FinishTxCommand cmd = msgFactory.finishTxCommand()
+                        .txId(req0.txId())
+                        .commit(req0.commit())
+                        .commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                        .tablePartitionIds(asList(tablePartitionId(new TablePartitionId(UUID.randomUUID(), 0))))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd)
+                        .thenCompose(ignored -> {
+                            TxCleanupCommand cleanupCmd = msgFactory.txCleanupCommand()
+                                    .txId(req0.txId())
+                                    .commit(req0.commit())
+                                    .commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                                    .safeTime(hybridTimestamp(hybridClock.now()))
+                                    .build();
+
+                            return service.run(cleanupCmd);
+                        });
+            }
 
-        managers.add(txManager);
+            throw new AssertionError("Unexpected request: " + req);
+        });
 
-        txManager.start();
+        return partitionReplicaListener;
+    }
 
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
+    /**
+     * Method to convert from {@link HybridTimestamp} object to NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
+        return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()

Review Comment:
   Why couldn't we reuse `org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener#hybridTimestamp`?



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(), any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestTxStateTableStorage(),
                 replicaService,
-                mock(HybridClock.class)
+                hybridClock
         );
 
+        closeables.add(() -> table.close());
+
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+    private PartitionReplicaListener mockPartitionReplicaListener(RaftGroupService service) {
+        PartitionReplicaListener partitionReplicaListener = mock(PartitionReplicaListener.class);
+
+        when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+            ReplicaRequest req = invocationOnMock.getArgument(0);
+
+            if (req instanceof ReadWriteSingleRowReplicaRequest) {
+                ReadWriteSingleRowReplicaRequest req0 = (ReadWriteSingleRowReplicaRequest) req;
+
+                if (req0.requestType() == RequestType.RW_GET) {
+                    int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+                    MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex);
+
+                    Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(partitionStorage);
+                    RowId rowId = primaryIndex.get(req0.binaryRow().keySlice());
+                    BinaryRow row = rowConverter.fromTuple(partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).tableRow().tupleSlice());
+
+                    return completedFuture(row);
+                }
+
+                // Non-null binary row if UPSERT, otherwise it's implied that request type is DELETE.
+                BinaryRow binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRow() : null;
+                TableRow tableRow = binaryRow == null ? null : TableRowConverter.fromBinaryRow(binaryRow, rowConverter);
+
+                UpdateCommand cmd = msgFactory.updateCommand()
+                        .txId(req0.transactionId())
+                        .tablePartitionId(tablePartitionId(new TablePartitionId(UUID.randomUUID(), 0)))
+                        .rowUuid(new RowId(0).uuid())
+                        .rowBuffer(tableRow == null ? null : tableRow.byteBuffer())
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd);
+            } else if (req instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+                FinishTxCommand cmd = msgFactory.finishTxCommand()
+                        .txId(req0.txId())
+                        .commit(req0.commit())
+                        .commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                        .tablePartitionIds(asList(tablePartitionId(new TablePartitionId(UUID.randomUUID(), 0))))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd)
+                        .thenCompose(ignored -> {
+                            TxCleanupCommand cleanupCmd = msgFactory.txCleanupCommand()
+                                    .txId(req0.txId())
+                                    .commit(req0.commit())
+                                    .commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                                    .safeTime(hybridTimestamp(hybridClock.now()))
+                                    .build();
+
+                            return service.run(cleanupCmd);
+                        });
+            }
 
-        managers.add(txManager);
+            throw new AssertionError("Unexpected request: " + req);
+        });
 
-        txManager.start();
+        return partitionReplicaListener;
+    }
 
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
+    /**
+     * Method to convert from {@link HybridTimestamp} object to NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
+        return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
+                .physical(tmstmp.getPhysical())
+                .logical(tmstmp.getLogical())
+                .build()
+                : null;
+    }
 
+    /**
+     * Method to convert from {@link TablePartitionId} object to command-based {@link TablePartitionIdMessage} object.
+     *
+     * @param tablePartId {@link TablePartitionId} object to convert to {@link TablePartitionIdMessage}.
+     * @return {@link TablePartitionIdMessage} object converted from argument.
+     */
+    private TablePartitionIdMessage tablePartitionId(TablePartitionId tablePartId) {

Review Comment:
   The same, lets make a static method that accepts message factories and use general logic from `org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener#tablePartitionId`



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java:
##########
@@ -44,193 +47,305 @@
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    /** Factory for creating replica command messages. */
+    private final ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final BinaryConverter keyConverter = BinaryConverter.forKey(SCHEMA);
+    private static final BinaryConverter rowConverter = BinaryConverter.forRow(SCHEMA);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row FIRST_KEY = createKeyRow(1);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    private static final Row SECOND_KEY = createKeyRow(2);
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    /**
-     * Paths for created partition listeners.
-     */
+    /** Paths for created partition listeners. */
     private final Map<PartitionListener, Path> paths = new ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new ConcurrentHashMap<>();
+
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(), any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
     @Override
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = mockPartitionReplicaListener(service);
+
+        replicaService = mock(ReplicaService.class);
 
-        managers.add(txManager);
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        txManager.start();
+        for (int i = 0; i <= 2; i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+        }
 
-        var table = new InternalTableImpl(
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 consistentIdToNode,
-                txManager,
+                txManagers.get(0),

Review Comment:
   is it ok that we pass here txManager with index 0, but node with this manager could be stopped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org