You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/10/28 12:55:11 UTC

[ignite-3] branch main updated: IGNITE-17302 add the propagation of the maximal storages' lastAppliedIndex on onSnapshotSave (#1263)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 81e46457eb IGNITE-17302 add the propagation of the maximal storages' lastAppliedIndex on onSnapshotSave (#1263)
81e46457eb is described below

commit 81e46457eb19f347a3f9f3f91855295fc71c4c49
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Fri Oct 28 15:55:06 2022 +0300

    IGNITE-17302 add the propagation of the maximal storages' lastAppliedIndex on onSnapshotSave (#1263)
---
 .../table/distributed/raft/PartitionListener.java  |  69 ++++++++--
 .../snapshot/PartitionSnapshotStorageFactory.java  |  12 +-
 .../snapshot/outgoing/OutgoingSnapshotReader.java  |   8 +-
 .../raft/PartitionCommandListenerTest.java         | 146 +++++++++++++++++++--
 .../PartitionSnapshotStorageFactoryTest.java       |  81 ++++++++++++
 .../outgoing/OutgoingSnapshotReaderTest.java       |  73 +++++++++++
 .../state/rocksdb/TxStateRocksDbStorage.java       |   7 +-
 .../tx/storage/state/test/TestTxStateStorage.java  |  35 ++---
 8 files changed, 389 insertions(+), 42 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index c4818d018a..c440e24a88 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.lock.AutoLockup;
@@ -97,7 +98,6 @@ public class PartitionListener implements RaftGroupListener {
         this.indexes = indexes;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
         iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo) -> {
@@ -107,7 +107,6 @@ public class PartitionListener implements RaftGroupListener {
         });
     }
 
-    /** {@inheritDoc} */
     @Override
     public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
         iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) -> {
@@ -115,11 +114,14 @@ public class PartitionListener implements RaftGroupListener {
 
             long commandIndex = clo.index();
 
-            long storageAppliedIndex = storage.lastAppliedIndex();
+            // We choose the minimum applied index, since we choose it (the minimum one) on local recovery so as not to lose the data for
+            // one of the storages.
+            long storagesAppliedIndex = Math.min(storage.lastAppliedIndex(), txStateStorage.lastAppliedIndex());
 
-            assert storageAppliedIndex < commandIndex
-                    : "Pending write command has a higher index than already processed commands [commandIndex=" + commandIndex
-                    + ", storageAppliedIndex=" + storageAppliedIndex + ']';
+            assert commandIndex > storagesAppliedIndex :
+                    "Write command must have an index greater than that of storages [commandIndex=" + commandIndex
+                            + ", mvAppliedIndex=" + storage.lastAppliedIndex()
+                            + ", txStateAppliedIndex=" + txStateStorage.lastAppliedIndex() + "]";
 
             try (AutoLockup ignoredPartitionSnapshotsReadLockup = storage.acquirePartitionSnapshotsReadLock()) {
                 if (command instanceof UpdateCommand) {
@@ -135,6 +137,7 @@ public class PartitionListener implements RaftGroupListener {
                 } else {
                     assert false : "Command was not found [cmd=" + command + ']';
                 }
+
                 clo.result(null);
             } catch (IgniteInternalException e) {
                 clo.result(e);
@@ -146,8 +149,14 @@ public class PartitionListener implements RaftGroupListener {
      * Handler for the {@link UpdateCommand}.
      *
      * @param cmd Command.
+     * @param commandIndex Index of the RAFT command.
      */
     private void handleUpdateCommand(UpdateCommand cmd, long commandIndex) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
         storage.runConsistently(() -> {
             BinaryRow row = cmd.getRow();
             RowId rowId = cmd.getRowId();
@@ -171,8 +180,14 @@ public class PartitionListener implements RaftGroupListener {
      * Handler for the {@link UpdateAllCommand}.
      *
      * @param cmd Command.
+     * @param commandIndex Index of the RAFT command.
      */
     private void handleUpdateAllCommand(UpdateAllCommand cmd, long commandIndex) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
         storage.runConsistently(() -> {
             UUID txId = cmd.txId();
             Map<RowId, BinaryRow> rowsToUpdate = cmd.getRowsToUpdate();
@@ -191,6 +206,7 @@ public class PartitionListener implements RaftGroupListener {
                     addToIndexes(row, rowId);
                 }
             }
+
             storage.lastAppliedIndex(commandIndex);
 
             return null;
@@ -200,11 +216,16 @@ public class PartitionListener implements RaftGroupListener {
     /**
      * Handler for the {@link FinishTxCommand}.
      *
-     * @param cmd          Command.
+     * @param cmd Command.
      * @param commandIndex Index of the RAFT command.
      * @throws IgniteInternalException if an exception occurred during a transaction state change.
      */
     private void handleFinishTxCommand(FinishTxCommand cmd, long commandIndex) throws IgniteInternalException {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= txStateStorage.lastAppliedIndex()) {
+            return;
+        }
+
         UUID txId = cmd.txId();
 
         TxState stateToSet = cmd.commit() ? TxState.COMMITED : TxState.ABORTED;
@@ -250,8 +271,14 @@ public class PartitionListener implements RaftGroupListener {
      * Handler for the {@link TxCleanupCommand}.
      *
      * @param cmd Command.
+     * @param commandIndex Index of the RAFT command.
      */
     private void handleTxCleanupCommand(TxCleanupCommand cmd, long commandIndex) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
         storage.runConsistently(() -> {
             UUID txId = cmd.txId();
 
@@ -283,19 +310,39 @@ public class PartitionListener implements RaftGroupListener {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
     public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
-        storage.flush();
+        // The max index here is required for local recovery and a possible scenario
+        // of false node failure when we actually have all required data. This might happen because we use the minimal index
+        // among storages on a node restart.
+        // Let's consider a more detailed example:
+        //      1) We don't propagate the maximal lastAppliedIndex among storages, and onSnapshotSave finishes, it leads to the raft log
+        //         truncation until the maximal lastAppliedIndex.
+        //      2) Unexpected cluster restart happens.
+        //      3) Local recovery of a node is started, where we request data from the minimal lastAppliedIndex among storages, because
+        //         some data for some node might not have been flushed before unexpected cluster restart.
+        //      4) When we try to restore data starting from the minimal lastAppliedIndex, we come to the situation
+        //         that a raft node doesn't have such data, because the truncation until the maximal lastAppliedIndex from 1) has happened.
+        //      5) Node cannot finish local recovery.
+        long maxLastAppliedIndex = Math.max(storage.lastAppliedIndex(), txStateStorage.lastAppliedIndex());
+
+        storage.runConsistently(() -> {
+            storage.lastAppliedIndex(maxLastAppliedIndex);
+
+            return null;
+        });
+
+        txStateStorage.lastAppliedIndex(maxLastAppliedIndex);
+
+        CompletableFuture.allOf(storage.flush(), txStateStorage.flush())
+                .whenComplete((unused, throwable) -> doneClo.accept(throwable));
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean onSnapshotLoad(Path path) {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void onShutdown() {
         // TODO: IGNITE-17958 - probably, we should not close the storage here as PartitionListener did not create the storage.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index e0301bf1bf..ea93af3048 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -26,7 +26,6 @@ import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 
@@ -56,7 +55,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
     /** List of learners. */
     private final List<String> learners;
 
-    /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()} during factory instantiation. */
+    /** RAFT log index. */
     private final long persistedRaftIndex;
 
     /** Incoming snapshots executor. */
@@ -89,11 +88,16 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
         this.learners = learners;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
 
-        persistedRaftIndex = partition.mvPartitionStorage().persistedIndex();
+        // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the
+        // lowest applied index and thus no data loss occurs.
+        persistedRaftIndex = Math.min(
+                partition.mvPartitionStorage().persistedIndex(),
+                partition.txStatePartitionStorage().persistedIndex()
+        );
     }
 
     @Override
-    public SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) {
+    public PartitionSnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) {
         SnapshotMeta snapshotMeta = new RaftMessagesFactory().snapshotMeta()
                 .lastIncludedIndex(persistedRaftIndex)
                 // According to the code of org.apache.ignite.raft.jraft.core.NodeImpl.bootstrap, it's "dangerous" to init term with a value
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index d6201de877..7cc345f054 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -47,10 +47,16 @@ public class OutgoingSnapshotReader extends SnapshotReader {
     public OutgoingSnapshotReader(PartitionSnapshotStorage snapshotStorage) {
         this.snapshotStorage = snapshotStorage;
 
+        // We have to choose the maximum applied index because we have to send a snapshot with all the latest changes in the storages.
+        long lastIncludedIndex = Math.max(
+                snapshotStorage.partition().mvPartitionStorage().persistedIndex(),
+                snapshotStorage.partition().txStatePartitionStorage().persistedIndex()
+        );
+
         //TODO https://issues.apache.org/jira/browse/IGNITE-17935
         // This meta is wrong, we need a right one.
         snapshotMeta = new RaftMessagesFactory().snapshotMeta()
-                .lastIncludedIndex(snapshotStorage.partition().mvPartitionStorage().persistedIndex())
+                .lastIncludedIndex(lastIncludedIndex)
                 .lastIncludedTerm(snapshotStorage.startupSnapshotMeta().lastIncludedTerm())
                 .peersList(snapshotStorage.startupSnapshotMeta().peersList())
                 .learnersList(snapshotStorage.startupSnapshotMeta().learnersList())
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index c45a5ddd29..4d2e345be6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,15 +17,25 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.Serializable;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -43,6 +53,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -53,18 +64,25 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.network.ClusterService;
@@ -72,14 +90,15 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
-import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 
 /**
  * Tests for the table command listener.
  */
+@ExtendWith(WorkDirectoryExtension.class)
 public class PartitionCommandListenerTest {
     /** Key count. */
     public static final int KEY_COUNT = 100;
@@ -116,22 +135,31 @@ public class PartitionCommandListenerTest {
     );
 
     /** Partition storage. */
-    private final MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(PARTITION_ID);
+    private final MvPartitionStorage mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID));
+
+    /** Transaction meta storage. */
+    private final TxStateStorage txStateStorage = spy(new TestTxStateStorage());
+
+    /** Work directory. */
+    @WorkDirectory
+    private Path workDir;
 
     /**
      * Initializes a table listener before tests.
      */
     @BeforeEach
     public void before() {
-        ClusterService clusterService = Mockito.mock(ClusterService.class, RETURNS_DEEP_STUBS);
         NetworkAddress addr = new NetworkAddress("127.0.0.1", 5003);
-        Mockito.when(clusterService.topologyService().localMember().address()).thenReturn(addr);
 
-        ReplicaService replicaService = Mockito.mock(ReplicaService.class, RETURNS_DEEP_STUBS);
+        ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
+
+        when(clusterService.topologyService().localMember().address()).thenReturn(addr);
+
+        ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
 
         commandListener = new PartitionListener(
                 new TestPartitionDataStorage(mvPartitionStorage),
-                new TestTxStateStorage(),
+                txStateStorage,
                 new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()),
                 () -> Map.of(pkStorage.id(), pkStorage)
         );
@@ -207,6 +235,108 @@ public class PartitionCommandListenerTest {
         readAndCheck(false);
     }
 
+    /**
+     * The test checks that {@link PartitionListener#onSnapshotSave(Path, Consumer)} propagates
+     * the maximal last applied index among storages to all storages.
+     */
+    @Test
+    public void testOnSnapshotSavePropagateLastAppliedIndex() {
+        ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
+
+        TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartitionStorage);
+
+        PartitionListener testCommandListener = new PartitionListener(
+                partitionDataStorage,
+                txStateStorage,
+                new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()),
+                () -> Map.of(pkStorage.id(), pkStorage)
+        );
+
+        txStateStorage.lastAppliedIndex(3L);
+
+        partitionDataStorage.lastAppliedIndex(5L);
+
+        AtomicLong counter = new AtomicLong(0);
+
+        testCommandListener.onSnapshotSave(workDir, (throwable) -> counter.incrementAndGet());
+
+        assertEquals(1L, counter.get());
+
+        assertEquals(5L, partitionDataStorage.lastAppliedIndex());
+
+        assertEquals(5L, txStateStorage.lastAppliedIndex());
+
+        txStateStorage.lastAppliedIndex(10L);
+
+        partitionDataStorage.lastAppliedIndex(7L);
+
+        testCommandListener.onSnapshotSave(workDir, (throwable) -> counter.incrementAndGet());
+
+        assertEquals(2L, counter.get());
+
+        assertEquals(10L, partitionDataStorage.lastAppliedIndex());
+
+        assertEquals(10L, txStateStorage.lastAppliedIndex());
+    }
+
+    @Test
+    void testSkipWriteCommandByAppliedIndex() {
+        mvPartitionStorage.lastAppliedIndex(10L);
+
+        ArgumentCaptor<Throwable> commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+        // Checks for MvPartitionStorage.
+        commandListener.onWrite(List.of(
+                writeCommandCommandClosure(3, mock(UpdateCommand.class), commandClosureResultCaptor),
+                writeCommandCommandClosure(10, mock(UpdateCommand.class), commandClosureResultCaptor),
+                writeCommandCommandClosure(4, mock(TxCleanupCommand.class), commandClosureResultCaptor),
+                writeCommandCommandClosure(5, mock(SafeTimeSyncCommand.class), commandClosureResultCaptor)
+        ).iterator());
+
+        verify(mvPartitionStorage, never()).runConsistently(any(WriteClosure.class));
+        verify(mvPartitionStorage, times(1)).lastAppliedIndex(anyLong());
+
+        assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null, null, null}));
+
+        // Checks for TxStateStorage.
+        mvPartitionStorage.lastAppliedIndex(1L);
+        txStateStorage.lastAppliedIndex(10L);
+
+        commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+        commandListener.onWrite(List.of(
+                writeCommandCommandClosure(2, mock(FinishTxCommand.class), commandClosureResultCaptor),
+                writeCommandCommandClosure(10, mock(FinishTxCommand.class), commandClosureResultCaptor)
+        ).iterator());
+
+        verify(txStateStorage, never()).compareAndSet(any(UUID.class), any(TxState.class), any(TxMeta.class), anyLong());
+        verify(txStateStorage, times(1)).lastAppliedIndex(anyLong());
+
+        assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null}));
+    }
+
+    /**
+     * Crate a command closure.
+     *
+     * @param index Index of the RAFT command.
+     * @param writeCommand Write command.
+     * @param resultClosureCaptor Captor for {@link CommandClosure#result(Serializable)}
+     */
+    private static CommandClosure<WriteCommand> writeCommandCommandClosure(
+            long index,
+            WriteCommand writeCommand,
+            ArgumentCaptor<Throwable> resultClosureCaptor
+    ) {
+        CommandClosure<WriteCommand> commandClosure = mock(CommandClosure.class);
+
+        when(commandClosure.index()).thenReturn(index);
+        when(commandClosure.command()).thenReturn(writeCommand);
+
+        doNothing().when(commandClosure).result(resultClosureCaptor.capture());
+
+        return commandClosure;
+    }
+
     /**
      * Prepares a closure iterator for a specific batch operation.
      *
@@ -451,7 +581,6 @@ public class PartitionCommandListenerTest {
      *
      * @return Row.
      */
-    @NotNull
     private Row getTestKey(int key) {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
 
@@ -465,7 +594,6 @@ public class PartitionCommandListenerTest {
      *
      * @return Row.
      */
-    @NotNull
     private Row getTestRow(int key, int val) {
         RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
new file mode 100644
index 0000000000..5fe7f4e324
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For testing {@link PartitionSnapshotStorageFactory}.
+ */
+public class PartitionSnapshotStorageFactoryTest {
+    @Test
+    void testForChoosingMinimumAppliedIndexForMeta() {
+        MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(0);
+        TxStateStorage txStateStorage = new TestTxStateStorage();
+
+        PartitionAccess partitionAccess = mock(PartitionAccess.class);
+
+        when(partitionAccess.mvPartitionStorage()).thenReturn(mvPartitionStorage);
+        when(partitionAccess.txStatePartitionStorage()).thenReturn(txStateStorage);
+
+        mvPartitionStorage.lastAppliedIndex(10L);
+        txStateStorage.lastAppliedIndex(5L);
+
+        PartitionSnapshotStorageFactory partitionSnapshotStorageFactory = new PartitionSnapshotStorageFactory(
+                mock(TopologyService.class),
+                mock(OutgoingSnapshotsManager.class),
+                partitionAccess,
+                List.of(),
+                List.of(),
+                mock(Executor.class)
+        );
+
+        PartitionSnapshotStorage snapshotStorage = partitionSnapshotStorageFactory.createSnapshotStorage("", mock(RaftOptions.class));
+
+        assertEquals(5L, snapshotStorage.startupSnapshotMeta().lastIncludedIndex());
+
+        mvPartitionStorage.lastAppliedIndex(1L);
+        txStateStorage.lastAppliedIndex(2L);
+
+        partitionSnapshotStorageFactory = new PartitionSnapshotStorageFactory(
+                mock(TopologyService.class),
+                mock(OutgoingSnapshotsManager.class),
+                partitionAccess,
+                List.of(),
+                List.of(),
+                mock(Executor.class)
+        );
+
+        snapshotStorage = partitionSnapshotStorageFactory.createSnapshotStorage("", mock(RaftOptions.class));
+
+        assertEquals(1L, snapshotStorage.startupSnapshotMeta().lastIncludedIndex());
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
new file mode 100644
index 0000000000..7485acc54c
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link OutgoingSnapshotReader} testing.
+ */
+public class OutgoingSnapshotReaderTest {
+    @Test
+    void testForChoosingMaximumAppliedIndexForMeta() {
+        MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(0);
+        TxStateStorage txStateStorage = new TestTxStateStorage();
+
+        PartitionAccess partitionAccess = mock(PartitionAccess.class);
+
+        when(partitionAccess.mvPartitionStorage()).thenReturn(mvPartitionStorage);
+        when(partitionAccess.txStatePartitionStorage()).thenReturn(txStateStorage);
+        when(partitionAccess.partitionKey()).thenReturn(new PartitionKey(UUID.randomUUID(), 0));
+
+        PartitionSnapshotStorage snapshotStorage = new PartitionSnapshotStorage(
+                mock(TopologyService.class),
+                mock(OutgoingSnapshotsManager.class),
+                "",
+                mock(RaftOptions.class),
+                partitionAccess,
+                mock(SnapshotMeta.class),
+                mock(Executor.class)
+        );
+
+        mvPartitionStorage.lastAppliedIndex(10L);
+        txStateStorage.lastAppliedIndex(5L);
+
+        assertEquals(10L, new OutgoingSnapshotReader(snapshotStorage).load().lastIncludedIndex());
+
+        mvPartitionStorage.lastAppliedIndex(1L);
+        txStateStorage.lastAppliedIndex(2L);
+
+        assertEquals(2L, new OutgoingSnapshotReader(snapshotStorage).load().lastIncludedIndex());
+    }
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index dfc5fc0813..39a7214214 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -165,7 +166,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
     }
 
     @Override
-    public boolean compareAndSet(UUID txId, TxState txStateExpected, TxMeta txMeta, long commandIndex) {
+    public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex) {
         requireNonNull(txMeta);
 
         if (!busyLock.enterBusy()) {
@@ -205,6 +206,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
 
             db.write(writeOptions, writeBatch);
 
+            lastAppliedIndex = commandIndex;
+
             return result;
         } catch (RocksDBException e) {
             throw new IgniteInternalException(
@@ -310,6 +313,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
 
         try {
             db.put(lastAppliedIndexKey, longToBytes(lastAppliedIndex));
+
+            this.lastAppliedIndex = lastAppliedIndex;
         } catch (RocksDBException e) {
             throw new IgniteInternalException(
                     TX_STATE_STORAGE_ERR,
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
index 9c119938f9..7091adca01 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.UnsignedUuidComparator;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Test implementation of {@link TxStateStorage} based on {@link ConcurrentSkipListMap}.
@@ -53,29 +53,32 @@ public class TestTxStateStorage implements TxStateStorage {
     }
 
     @Override
-    public boolean compareAndSet(UUID txId, TxState txStateExpected, @NotNull TxMeta txMeta, long commandIndex) {
-        while (true) {
-            TxMeta old = storage.get(txId);
-
-            if (old == null && txStateExpected == null) {
-                TxMeta oldMeta = storage.putIfAbsent(txId, txMeta);
-                if (oldMeta == null) {
-                    return true;
-                } else {
-                    return false;
-                }
-            } else if (old != null) {
+    public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex) {
+        TxMeta old = storage.get(txId);
+
+        boolean result;
+
+        if (old == null && txStateExpected == null) {
+            TxMeta oldMeta = storage.putIfAbsent(txId, txMeta);
+
+            result = oldMeta == null;
+        } else {
+            if (old != null) {
                 if (old.txState() == txStateExpected) {
-                    if (storage.replace(txId, old, txMeta)) {
-                        return true;
-                    }
+                    result = storage.replace(txId, old, txMeta);
                 } else {
                     return old.txState() == txMeta.txState() && (
                             (old.commitTimestamp() == null && txMeta.commitTimestamp() == null)
                                     || old.commitTimestamp().equals(txMeta.commitTimestamp()));
                 }
+            } else {
+                result = false;
             }
         }
+
+        lastAppliedIndex = commandIndex;
+
+        return result;
     }
 
     @Override