You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/17 10:12:39 UTC

(ignite-3) branch main updated: IGNITE-18595 Implement index build process during the full state transfer (#3215)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4468f976b5 IGNITE-18595 Implement index build process during the full state transfer (#3215)
4468f976b5 is described below

commit 4468f976b5db599ef0d205d8f30bbdd723df3a15
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Sat Feb 17 13:12:33 2024 +0300

    IGNITE-18595 Implement index build process during the full state transfer (#3215)
---
 .../ignite/internal/catalog/CatalogTestUtils.java  |   2 +-
 .../apache/ignite/internal/index/IndexBuilder.java |   1 -
 .../impl/StandaloneMetaStorageManager.java         |  20 +-
 .../internal/table/distributed/TableManager.java   |  24 +-
 .../snapshot/FullStateTransferIndexChooser.java    | 325 ++++++++++++++++++
 .../distributed/raft/snapshot/PartitionAccess.java |  22 +-
 .../raft/snapshot/PartitionAccessImpl.java         |  22 +-
 .../raft/snapshot/ReadOnlyIndexInfo.java           | 104 ++++++
 .../snapshot/incoming/IncomingSnapshotCopier.java  |   6 +-
 .../FullStateTransferIndexChooserTest.java         | 373 +++++++++++++++++++++
 .../raft/snapshot/PartitionAccessImplTest.java     | 137 +++++---
 .../incoming/IncomingSnapshotCopierTest.java       |  29 +-
 .../ignite/internal/table/TableTestUtils.java      |   4 +
 13 files changed, 974 insertions(+), 95 deletions(-)

diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index 18146c6433..07f82632c7 100644
--- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -62,7 +62,7 @@ public class CatalogTestUtils {
      * @param clock Hybrid clock.
      */
     public static CatalogManager createTestCatalogManager(String nodeName, HybridClock clock) {
-        StandaloneMetaStorageManager metastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(nodeName));
+        StandaloneMetaStorageManager metastore = StandaloneMetaStorageManager.create(new SimpleInMemoryKeyValueStorage(nodeName), clock);
 
         var clockWaiter = new ClockWaiter(nodeName, clock);
 
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index daa3f51375..c9e8d0c32d 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -98,7 +98,6 @@ class IndexBuilder implements ManuallyCloseable {
      *      to the replica.
      * @param creationCatalogVersion Catalog version in which the index was created.
      */
-    // TODO: IGNITE-19498 Perhaps we need to start building the index only once
     public void scheduleBuildIndex(
             int tableId,
             int partitionId,
diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 763e317215..9d8dcd73a9 100644
--- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
@@ -78,6 +79,17 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
      * @param keyValueStorage Key-value storage.
      */
     public static StandaloneMetaStorageManager create(KeyValueStorage keyValueStorage) {
+        return create(keyValueStorage, new HybridClockImpl());
+    }
+
+    /**
+     * Creates standalone MetaStorage manager for provided key-value storage. The manager is responsible for starting/stopping provided
+     * key-value storage.
+     *
+     * @param keyValueStorage Key-value storage.
+     * @param clock Clock.
+     */
+    public static StandaloneMetaStorageManager create(KeyValueStorage keyValueStorage, HybridClock clock) {
         return new StandaloneMetaStorageManager(
                 mockClusterService(),
                 mockClusterGroupManager(),
@@ -85,7 +97,8 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
                 mockRaftManager(),
                 keyValueStorage,
                 mock(TopologyAwareRaftGroupServiceFactory.class),
-                mockConfiguration()
+                mockConfiguration(),
+                clock
         );
     }
 
@@ -105,7 +118,8 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
             RaftManager raftMgr,
             KeyValueStorage storage,
             TopologyAwareRaftGroupServiceFactory raftServiceFactory,
-            MetaStorageConfiguration configuration
+            MetaStorageConfiguration configuration,
+            HybridClock clock
     ) {
         super(
                 clusterService,
@@ -113,7 +127,7 @@ public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
                 logicalTopologyService,
                 raftMgr,
                 storage,
-                new HybridClockImpl(),
+                clock,
                 raftServiceFactory,
                 configuration
         );
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4f440ce765..d0e972b101 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -156,6 +156,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEventsListener;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory;
@@ -373,6 +374,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
     /** Marshallers provider. */
     private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider();
 
+    /** Index chooser for full state transfer. */
+    private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
     /**
      * Creates a new table manager.
      *
@@ -523,17 +527,21 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 txStateStoragePool,
                 TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER
         );
+
+        fullStateTransferIndexChooser = new FullStateTransferIndexChooser(catalogService);
     }
 
     @Override
     public CompletableFuture<Void> start() {
-        inBusyLock(busyLock, () -> {
+        return inBusyLockAsync(busyLock, () -> {
             mvGc.start();
 
             lowWatermark.start();
 
             transactionStateResolver.start();
 
+            fullStateTransferIndexChooser.start();
+
             CompletableFuture<Long> recoveryFinishFuture = metaStorageMgr.recoveryFinishedFuture();
 
             assert recoveryFinishFuture.isDone();
@@ -571,9 +579,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
             });
 
             partitionReplicatorNodeRecovery.start();
-        });
 
-        return nullCompletedFuture();
+            return nullCompletedFuture();
+        });
     }
 
     private void processAssignmentsOnRecovery(long recoveryRevision) {
@@ -1037,7 +1045,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                         txStateTableStorage,
                         mvGc,
                         partitionUpdateHandlers.indexUpdateHandler,
-                        partitionUpdateHandlers.gcUpdateHandler
+                        partitionUpdateHandlers.gcUpdateHandler,
+                        fullStateTransferIndexChooser
                 ),
                 catalogService,
                 incomingSnapshotsExecutor
@@ -1081,6 +1090,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
         IgniteUtils.closeAllManually(
                 lowWatermark,
                 mvGc,
+                fullStateTransferIndexChooser,
                 () -> shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS),
                 () -> shutdownAndAwaitTermination(txStateStoragePool, 10, TimeUnit.SECONDS),
                 () -> shutdownAndAwaitTermination(txStateStorageScheduledPool, 10, TimeUnit.SECONDS),
@@ -1150,7 +1160,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
             long causalityToken,
             int catalogVersion,
             CatalogTableDescriptor tableDescriptor,
-            // TODO: IGNITE-18595 We need to do something different to wait for indexes before full rebalancing
+            // TODO: IGNITE-19513 We need to do something different to wait for indexes before full rebalancing
             boolean onNodeRecovery
     ) {
         return inBusyLockAsync(busyLock, () -> {
@@ -1212,7 +1222,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
             CatalogZoneDescriptor zoneDescriptor,
             CompletableFuture<List<Set<Assignment>>> assignmentsFuture,
             int catalogVersion,
-            // TODO: IGNITE-18595 We need to do something different to wait for indexes before full rebalancing
+            // TODO: IGNITE-19513 We need to do something different to wait for indexes before full rebalancing
             boolean onNodeRecovery
     ) {
         String tableName = tableDescriptor.name();
@@ -1242,7 +1252,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
 
         var table = new TableImpl(internalTable, lockMgr, schemaVersions, marshallers, sql.get());
 
-        // TODO: IGNITE-18595 We need to do something different to wait for indexes before full rebalancing
+        // TODO: IGNITE-19513 We need to do something different to wait for indexes before full rebalancing
         table.addIndexesToWait(collectTableIndexIds(tableId, catalogVersion, onNodeRecovery));
 
         tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
new file mode 100644
index 0000000000..4fbdd81b54
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
+import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CollectionUtils.view;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Index chooser for full state transfer. */
+// TODO: IGNITE-21502 Deal with the case of drop a table
+// TODO: IGNITE-21502 Stop writing to a dropped index that was in status before AVAILABLE
+// TODO: IGNITE-21514 Stop writing to indexes that are destroyed during catalog compaction
+public class FullStateTransferIndexChooser implements ManuallyCloseable {
+    private final CatalogService catalogService;
+
+    private final NavigableSet<ReadOnlyIndexInfo> readOnlyIndexes = new ConcurrentSkipListSet<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    public FullStateTransferIndexChooser(CatalogService catalogService) {
+        this.catalogService = catalogService;
+    }
+
+    /** Starts the component. */
+    public void start() {
+        inBusyLockSafe(busyLock, () -> {
+            addListenersBusy();
+
+            recoverReadOnlyIndexesBusy();
+        });
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        readOnlyIndexes.clear();
+    }
+
+    /**
+     * Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow, UUID, int, int, int)} (write intent).
+     *
+     * <p>Index selection algorithm:</p>
+     * <ul>
+     *     <li>If the index in the snapshot catalog version is in status {@link CatalogIndexStatus#BUILDING},
+     *     {@link CatalogIndexStatus#AVAILABLE} or {@link CatalogIndexStatus#STOPPING}.</li>
+     *     <li>If the index in status {@link CatalogIndexStatus#REGISTERED} and it is in this status on the active version of the catalog
+     *     for {@code beginTs}.</li>
+     *     <li>For a read-only index, if {@code beginTs} is strictly less than the activation time of dropping the index.</li>
+     * </ul>
+     *
+     * @param catalogVersion Catalog version of the incoming partition snapshot.
+     * @param tableId Table ID for which indexes will be chosen.
+     * @param beginTs Begin timestamp of the transaction.
+     * @return List of index IDs sorted in ascending order.
+     */
+    public List<Integer> chooseForAddWrite(int catalogVersion, int tableId, HybridTimestamp beginTs) {
+        return inBusyLock(busyLock, () -> {
+            int activeCatalogVersionAtBeginTxTs = catalogService.activeCatalogVersion(beginTs.longValue());
+
+            List<Integer> fromCatalog = chooseFromCatalogBusy(catalogVersion, tableId, index -> {
+                if (index.status() == REGISTERED) {
+                    CatalogIndexDescriptor indexAtBeginTs = catalogService.index(index.id(), activeCatalogVersionAtBeginTxTs);
+
+                    return indexAtBeginTs != null && indexAtBeginTs.status() == REGISTERED;
+                }
+
+                return true;
+            });
+
+            List<Integer> fromReadOnlyIndexes = chooseFromReadOnlyIndexesBusy(tableId, beginTs);
+
+            return mergeWithoutDuplicates(fromCatalog, fromReadOnlyIndexes);
+        });
+    }
+
+    /**
+     * Collect indexes for {@link PartitionAccess#addWriteCommitted(RowId, BinaryRow, HybridTimestamp, int)} (write committed only).
+     *
+     * <p>Index selection algorithm:</p>
+     * <ul>
+     *     <li>If the index in the snapshot catalog version is in status {@link CatalogIndexStatus#BUILDING},
+     *     {@link CatalogIndexStatus#AVAILABLE} or {@link CatalogIndexStatus#STOPPING}.</li>
+     *     <li>For a read-only index, if {@code commitTs} is strictly less than the activation time of dropping the index.</li>
+     * </ul>
+     *
+     * @param catalogVersion Catalog version of the incoming partition snapshot.
+     * @param tableId Table ID for which indexes will be chosen.
+     * @param commitTs Timestamp to associate with committed value.
+     * @return List of index IDs sorted in ascending order.
+     */
+    public List<Integer> chooseForAddWriteCommitted(int catalogVersion, int tableId, HybridTimestamp commitTs) {
+        return inBusyLock(busyLock, () -> {
+            List<Integer> fromCatalog = chooseFromCatalogBusy(catalogVersion, tableId, index -> index.status() != REGISTERED);
+
+            List<Integer> fromReadOnlyIndexes = chooseFromReadOnlyIndexesBusy(tableId, commitTs);
+
+            return mergeWithoutDuplicates(fromCatalog, fromReadOnlyIndexes);
+        });
+    }
+
+    private List<Integer> chooseFromCatalogBusy(int catalogVersion, int tableId, Predicate<CatalogIndexDescriptor> filter) {
+        List<CatalogIndexDescriptor> indexes = catalogService.indexes(catalogVersion, tableId);
+
+        if (indexes.isEmpty()) {
+            return List.of();
+        }
+
+        var result = new ArrayList<CatalogIndexDescriptor>(indexes.size());
+
+        for (CatalogIndexDescriptor index : indexes) {
+            switch (index.status()) {
+                case REGISTERED:
+                case BUILDING:
+                case AVAILABLE:
+                case STOPPING:
+                    if (filter.test(index)) {
+                        result.add(index);
+                    }
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown index status: " + index.status());
+            }
+        }
+
+        return view(result, CatalogObjectDescriptor::id);
+    }
+
+    private List<Integer> chooseFromReadOnlyIndexesBusy(int tableId, HybridTimestamp fromTsExcluded) {
+        ReadOnlyIndexInfo fromKeyIncluded = new ReadOnlyIndexInfo(tableId, fromTsExcluded.longValue() + 1, 0);
+        ReadOnlyIndexInfo toKeyExcluded = new ReadOnlyIndexInfo(tableId + 1, 0, 0);
+
+        NavigableSet<ReadOnlyIndexInfo> subSet = readOnlyIndexes.subSet(fromKeyIncluded, true, toKeyExcluded, false);
+
+        if (subSet.isEmpty()) {
+            return List.of();
+        }
+
+        return subSet.stream().map(ReadOnlyIndexInfo::indexId).sorted().collect(toList());
+    }
+
+    private static List<Integer> mergeWithoutDuplicates(List<Integer> l0, List<Integer> l1) {
+        if (l0.isEmpty()) {
+            return l1;
+        } else if (l1.isEmpty()) {
+            return l0;
+        }
+
+        var result = new ArrayList<Integer>(l0.size() + l1.size());
+
+        for (int i0 = 0, i1 = 0; i0 < l0.size() || i1 < l1.size(); ) {
+            if (i0 >= l0.size()) {
+                result.add(l1.get(i1++));
+            } else if (i1 >= l1.size()) {
+                result.add(l0.get(i0++));
+            } else {
+                Integer indexId0 = l0.get(i0);
+                Integer indexId1 = l1.get(i1);
+
+                if (indexId0 < indexId1) {
+                    result.add(indexId0);
+                    i0++;
+                } else if (indexId0 > indexId1) {
+                    result.add(indexId1);
+                    i1++;
+                } else {
+                    result.add(indexId0);
+                    i0++;
+                    i1++;
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private void addListenersBusy() {
+        catalogService.listen(INDEX_REMOVED, (parameters, exception) -> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onIndexRemoved((RemoveIndexEventParameters) parameters).thenApply(unused -> false);
+        });
+    }
+
+    private CompletableFuture<?> onIndexRemoved(RemoveIndexEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int catalogVersion = parameters.catalogVersion();
+
+            CatalogIndexDescriptor index = indexBusy(indexId, catalogVersion - 1);
+
+            if (index.status() == AVAILABLE) {
+                // On drop table event.
+                readOnlyIndexes.add(new ReadOnlyIndexInfo(index, catalogActivationTimestampBusy(catalogVersion)));
+            } else if (index.status() == STOPPING) {
+                readOnlyIndexes.add(new ReadOnlyIndexInfo(index, findStoppingActivationTsBusy(indexId, catalogVersion - 1)));
+            }
+
+            return nullCompletedFuture();
+        });
+    }
+
+    private long catalogActivationTimestampBusy(int catalogVersion) {
+        Catalog catalog = catalogService.catalog(catalogVersion);
+
+        assert catalog != null : catalogVersion;
+
+        return catalog.time();
+    }
+
+    private void recoverReadOnlyIndexesBusy() {
+        int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+        int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+        var readOnlyIndexById = new HashMap<Integer, ReadOnlyIndexInfo>();
+        var previousCatalogVersionTableIds = Set.<Integer>of();
+
+        // TODO: IGNITE-21514 Deal with catalog compaction
+        for (int catalogVersion = earliestCatalogVersion; catalogVersion <= latestCatalogVersion; catalogVersion++) {
+            long activationTs = catalogActivationTimestampBusy(catalogVersion);
+
+            catalogService.indexes(catalogVersion).stream()
+                    .filter(index -> index.status() == STOPPING)
+                    .forEach(index -> readOnlyIndexById.computeIfAbsent(index.id(), i -> new ReadOnlyIndexInfo(index, activationTs)));
+
+            Set<Integer> currentCatalogVersionTableIds = tableIds(catalogVersion);
+
+            // Here we look for indices that transitioned directly from AVAILABLE to [deleted] (corresponding to the logical READ_ONLY
+            // state) as such transitions only happen when a table is dropped.
+            int finalCatalogVersion = catalogVersion;
+            difference(previousCatalogVersionTableIds, currentCatalogVersionTableIds).stream()
+                    .flatMap(droppedTableId -> catalogService.indexes(finalCatalogVersion - 1, droppedTableId).stream())
+                    .filter(index -> index.status() == AVAILABLE)
+                    .forEach(index -> readOnlyIndexById.computeIfAbsent(index.id(), i -> new ReadOnlyIndexInfo(index, activationTs)));
+
+            previousCatalogVersionTableIds = currentCatalogVersionTableIds;
+        }
+
+        readOnlyIndexes.addAll(readOnlyIndexById.values());
+    }
+
+    private CatalogIndexDescriptor indexBusy(int indexId, int catalogVersion) {
+        CatalogIndexDescriptor index = catalogService.index(indexId, catalogVersion);
+
+        assert index != null : "indexId=" + indexId + ", catalogVersion=" + catalogVersion;
+
+        return index;
+    }
+
+    private long findStoppingActivationTsBusy(int indexId, int toCatalogVersionIncluded) {
+        int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+
+        for (int catalogVersion = toCatalogVersionIncluded; catalogVersion >= earliestCatalogVersion; catalogVersion--) {
+            if (indexBusy(indexId, catalogVersion).status() == AVAILABLE) {
+                return catalogActivationTimestampBusy(catalogVersion + 1);
+            }
+        }
+
+        throw new AssertionError(format(
+                "{} status activation timestamp was not found for index: [indexId={}, toCatalogVersionIncluded={}]",
+                STOPPING, indexId, toCatalogVersionIncluded
+        ));
+    }
+
+    private Set<Integer> tableIds(int catalogVersion) {
+        return catalogService.tables(catalogVersion).stream().map(CatalogObjectDescriptor::id).collect(toSet());
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index e6f8230a24..06e25d346a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -85,32 +85,34 @@ public interface PartitionAccess {
     @Nullable RaftGroupConfiguration committedGroupConfiguration();
 
     /**
-     * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction id. In details: - if there is no
+     * Creates (or replaces) an uncommitted (aka pending) version, assigned to the given transaction ID. In details: - if there is no
      * uncommitted version, a new uncommitted version is added - if there is an uncommitted version belonging to the same transaction, it
      * gets replaced by the given version - if there is an uncommitted version belonging to a different transaction,
      * {@link TxIdMismatchException} is thrown
      *
-     * @param rowId Row id.
+     * @param rowId Row ID.
      * @param row Table row to update. Key only row means value removal.
-     * @param txId Transaction id.
-     * @param commitTableId Commit table id.
+     * @param txId Transaction ID.
+     * @param commitTableId Commit table ID.
      * @param commitPartitionId Commit partitionId.
-     * @throws TxIdMismatchException If there's another pending update associated with different transaction id.
+     * @param catalogVersion Catalog version of the incoming partition snapshot.
+     * @throws TxIdMismatchException If there's another pending update associated with different transaction ID.
      * @throws StorageException If failed to write data.
      */
-    void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId);
+    void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId, int catalogVersion);
 
     /**
      * Creates a committed version. In details: - if there is no uncommitted version, a new committed version is added - if there is an
      * uncommitted version, this method may fail with a system exception (this method should not be called if there is already something
      * uncommitted for the given row).
      *
-     * @param rowId Row id.
+     * @param rowId Row ID.
      * @param row Table row to update. Key only row means value removal.
      * @param commitTimestamp Timestamp to associate with committed value.
+     * @param catalogVersion Catalog version of the incoming partition snapshot.
      * @throws StorageException If failed to write data.
      */
-    void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp);
+    void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp, int catalogVersion);
 
     /**
      * Returns the minimum applied index of the partition storages.
@@ -147,8 +149,8 @@ public interface PartitionAccess {
      *         <li>{@link #maxLastAppliedTerm()};</li>
      *         <li>{@link #committedGroupConfiguration()};</li>
      *         <li>{@link #addTxMeta(UUID, TxMeta)};</li>
-     *         <li>{@link #addWrite(RowId, BinaryRow, UUID, int, int)};</li>
-     *         <li>{@link #addWriteCommitted(RowId, BinaryRow, HybridTimestamp)}.</li>
+     *         <li>{@link #addWrite(RowId, BinaryRow, UUID, int, int, int)};</li>
+     *         <li>{@link #addWriteCommitted(RowId, BinaryRow, HybridTimestamp, int)}.</li>
      *     </ul></li>
      * </ul>
      *
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index cddedf3bb1..933576e9c3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
 
 import java.util.List;
 import java.util.UUID;
@@ -60,6 +61,8 @@ public class PartitionAccessImpl implements PartitionAccess {
 
     private final GcUpdateHandler gcUpdateHandler;
 
+    private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
+
     /**
      * Constructor.
      *
@@ -69,6 +72,7 @@ public class PartitionAccessImpl implements PartitionAccess {
      * @param mvGc Garbage collector for multi-versioned storages and their indexes in the background.
      * @param indexUpdateHandler Index update handler.
      * @param gcUpdateHandler Gc update handler.
+     * @param fullStateTransferIndexChooser Index chooser for full state transfer.
      */
     public PartitionAccessImpl(
             PartitionKey partitionKey,
@@ -76,7 +80,8 @@ public class PartitionAccessImpl implements PartitionAccess {
             TxStateTableStorage txStateTableStorage,
             MvGc mvGc,
             IndexUpdateHandler indexUpdateHandler,
-            GcUpdateHandler gcUpdateHandler
+            GcUpdateHandler gcUpdateHandler,
+            FullStateTransferIndexChooser fullStateTransferIndexChooser
     ) {
         this.partitionKey = partitionKey;
         this.mvTableStorage = mvTableStorage;
@@ -84,6 +89,7 @@ public class PartitionAccessImpl implements PartitionAccess {
         this.mvGc = mvGc;
         this.indexUpdateHandler = indexUpdateHandler;
         this.gcUpdateHandler = gcUpdateHandler;
+        this.fullStateTransferIndexChooser = fullStateTransferIndexChooser;
     }
 
     @Override
@@ -135,32 +141,34 @@ public class PartitionAccessImpl implements PartitionAccess {
     }
 
     @Override
-    public void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId) {
+    public void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int commitTableId, int commitPartitionId, int catalogVersion) {
         MvPartitionStorage mvPartitionStorage = getMvPartitionStorage(partitionId());
 
+        List<Integer> indexIds = fullStateTransferIndexChooser.chooseForAddWrite(catalogVersion, tableId(), beginTimestamp(txId));
+
         mvPartitionStorage.runConsistently(locker -> {
             locker.lock(rowId);
 
             mvPartitionStorage.addWrite(rowId, row, txId, commitTableId, commitPartitionId);
 
-            // TODO: IGNITE-18595 We need to know the indexes for a full rebalance, i.e. null must go
-            indexUpdateHandler.addToIndexes(row, rowId, null);
+            indexUpdateHandler.addToIndexes(row, rowId, indexIds);
 
             return null;
         });
     }
 
     @Override
-    public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) {
+    public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp, int catalogVersion) {
         MvPartitionStorage mvPartitionStorage = getMvPartitionStorage(partitionId());
 
+        List<Integer> indexIds = fullStateTransferIndexChooser.chooseForAddWriteCommitted(catalogVersion, tableId(), commitTimestamp);
+
         mvPartitionStorage.runConsistently(locker -> {
             locker.lock(rowId);
 
             mvPartitionStorage.addWriteCommitted(rowId, row, commitTimestamp);
 
-            // TODO: IGNITE-18595 We need to know the indexes for a full rebalance, i.e. null must go
-            indexUpdateHandler.addToIndexes(row, rowId, null);
+            indexUpdateHandler.addToIndexes(row, rowId, indexIds);
 
             return null;
         });
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/ReadOnlyIndexInfo.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/ReadOnlyIndexInfo.java
new file mode 100644
index 0000000000..9fc733fbc1
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/ReadOnlyIndexInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.tostring.S;
+
+/** Internal class for use in {@link FullStateTransferIndexChooser} for read-only indexes. */
+final class ReadOnlyIndexInfo implements Comparable<ReadOnlyIndexInfo> {
+    private final int tableId;
+
+    /**
+     * Timestamp of activation of the catalog version in which the index got the status {@link CatalogIndexStatus#STOPPING} or the table was
+     * dropped and the index had the status {@link CatalogIndexStatus#AVAILABLE}.
+     */
+    private final long activationTs;
+
+    private final int indexId;
+
+    public ReadOnlyIndexInfo(CatalogIndexDescriptor index, long activationTs) {
+        this(index.tableId(), activationTs, index.id());
+    }
+
+    ReadOnlyIndexInfo(int tableId, long activationTs, int indexId) {
+        this.tableId = tableId;
+        this.activationTs = activationTs;
+        this.indexId = indexId;
+    }
+
+    int tableId() {
+        return tableId;
+    }
+
+    long activationTs() {
+        return activationTs;
+    }
+
+    int indexId() {
+        return indexId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ReadOnlyIndexInfo other = (ReadOnlyIndexInfo) o;
+
+        return tableId == other.tableId
+                && activationTs == other.activationTs
+                && indexId == other.indexId;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = tableId;
+        result = 31 * result + (int) (activationTs ^ (activationTs >>> 32));
+        result = 31 * result + indexId;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
+
+    @Override
+    public int compareTo(ReadOnlyIndexInfo other) {
+        int cmp = Integer.compare(tableId, other.tableId);
+
+        if (cmp != 0) {
+            return cmp;
+        }
+
+        cmp = Long.compare(activationTs, other.activationTs);
+
+        if (cmp != 0) {
+            return cmp;
+        }
+
+        return Integer.compare(indexId, other.indexId);
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 72eb1c4aaf..ab59b244cf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -492,16 +492,18 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
         PartitionAccess partition = partitionSnapshotStorage.partition();
 
+        int snapshotCatalogVersion = snapshotMeta.requiredCatalogVersion();
+
         if (i == entry.timestamps().length) {
             // Writes an intent to write (uncommitted version).
             assert entry.txId() != null;
             assert entry.commitTableId() != null;
             assert entry.commitPartitionId() != ReadResult.UNDEFINED_COMMIT_PARTITION_ID;
 
-            partition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId());
+            partition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId(), snapshotCatalogVersion);
         } else {
             // Writes committed version.
-            partition.addWriteCommitted(rowId, binaryRow, hybridTimestamp(entry.timestamps()[i]));
+            partition.addWriteCommitted(rowId, binaryRow, hybridTimestamp(entry.timestamps()[i]), snapshotCatalogVersion);
         }
     }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
new file mode 100644
index 0000000000..7e82ed370b
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.table.TableTestUtils.INDEX_NAME;
+import static org.apache.ignite.internal.table.TableTestUtils.PK_INDEX_NAME;
+import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
+import static org.apache.ignite.internal.table.TableTestUtils.createSimpleHashIndex;
+import static org.apache.ignite.internal.table.TableTestUtils.createSimpleTable;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
+import static org.apache.ignite.internal.table.TableTestUtils.makeIndexAvailable;
+import static org.apache.ignite.internal.table.TableTestUtils.startBuildingIndex;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** For {@link FullStateTransferIndexChooser} testing. */
+public class FullStateTransferIndexChooserTest extends BaseIgniteAbstractTest {
+    private static final String REGISTERED_INDEX_NAME = INDEX_NAME + "_" + REGISTERED;
+
+    private static final String BUILDING_INDEX_NAME = INDEX_NAME + "_" + BUILDING;
+
+    private static final String AVAILABLE_INDEX_NAME = INDEX_NAME + "_" + AVAILABLE;
+
+    private static final String STOPPING_INDEX_NAME = INDEX_NAME + "_" + STOPPING;
+
+    private static final String READ_ONLY_INDEX_NAME = INDEX_NAME + "_READ_ONLY";
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private CatalogManager catalogManager;
+
+    private FullStateTransferIndexChooser indexChooser;
+
+    @BeforeEach
+    void setUp() {
+        catalogManager = CatalogTestUtils.createTestCatalogManager("test", clock);
+
+        indexChooser = new FullStateTransferIndexChooser(catalogManager);
+
+        assertThat(catalogManager.start(), willCompleteSuccessfully());
+
+        indexChooser.start();
+
+        createSimpleTable(catalogManager, TABLE_NAME);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        closeAllManually(
+                indexChooser,
+                catalogManager::beforeNodeStop,
+                catalogManager::stop
+        );
+    }
+
+    @Test
+    void chooseForAddWriteWithSecondaryAndWithoutReadOnlyIndexes() {
+        int pkIndexId = indexId(PK_INDEX_NAME);
+        assertThat(chooseForAddWriteLatest(), contains(pkIndexId));
+
+        createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+        int registeredIndexId = indexId(REGISTERED_INDEX_NAME);
+        assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId));
+
+        createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+        int buildingIndexId = indexId(BUILDING_INDEX_NAME);
+        assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId, buildingIndexId));
+
+        createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+        int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+        assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId, buildingIndexId, availableIndexId));
+
+        createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+        int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+        assertThat(chooseForAddWriteLatest(), contains(pkIndexId, registeredIndexId, buildingIndexId, availableIndexId, stoppingIndexId));
+    }
+
+    @Test
+    void chooseForAddWriteWithSecondaryAndWithoutReadOnlyAndRegisteredIndexes() {
+        HybridTimestamp beginTsBeforeCreateRegisteredIndex = clock.now();
+
+        createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+
+        int pkIndexId = indexId(PK_INDEX_NAME);
+        int registeredIndexId = indexId(REGISTERED_INDEX_NAME);
+
+        assertThat(chooseForAddWriteLatest(beginTsBeforeCreateRegisteredIndex), contains(pkIndexId));
+
+        int catalogVersionBeforeStartBuildingIndex = latestCatalogVersion();
+
+        startBuildingIndex(catalogManager, registeredIndexId);
+
+        assertThat(
+                indexChooser.chooseForAddWrite(catalogVersionBeforeStartBuildingIndex, tableId(TABLE_NAME), clock.now()),
+                contains(pkIndexId)
+        );
+    }
+
+    @Test
+    void chooseForAddWriteCommittedWithSecondaryAndWithoutReadOnlyIndexes() {
+        int pkIndexId = indexId(PK_INDEX_NAME);
+        assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId));
+
+        createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+        assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId));
+
+        createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+        int buildingIndexId = indexId(BUILDING_INDEX_NAME);
+        assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId, buildingIndexId));
+
+        createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+        int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+        assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId, buildingIndexId, availableIndexId));
+
+        createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+        int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+        assertThat(chooseForAddWriteCommittedLatest(), contains(pkIndexId, buildingIndexId, availableIndexId, stoppingIndexId));
+    }
+
+    @ParameterizedTest(name = "recovery = {0}")
+    @ValueSource(booleans = {false, true})
+    void chooseForAddWriteCommittedWithSecondaryAndReadOnlyIndexes(boolean recovery) {
+        createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+        createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+
+        HybridTimestamp commitTsBeforeStoppingIndex = clock.now();
+
+        createSimpleStoppingIndex(READ_ONLY_INDEX_NAME);
+
+        HybridTimestamp commitTsOnStoppingIndex = latestCatalogVersionActivationTs();
+
+        int pkIndexId = indexId(PK_INDEX_NAME);
+        int readOnlyIndexId = indexId(READ_ONLY_INDEX_NAME);
+
+        dropIndex(REGISTERED_INDEX_NAME);
+        dropIndex(BUILDING_INDEX_NAME);
+        removeIndex(READ_ONLY_INDEX_NAME);
+
+        if (recovery) {
+            recoverIndexChooser();
+        }
+
+        assertThat(chooseForAddWriteCommittedLatest(commitTsBeforeStoppingIndex), contains(pkIndexId, readOnlyIndexId));
+        assertThat(chooseForAddWriteCommittedLatest(commitTsOnStoppingIndex), contains(pkIndexId));
+        assertThat(chooseForAddWriteCommittedLatest(clock.now()), contains(pkIndexId));
+    }
+
+    @ParameterizedTest(name = "recovery = {0}")
+    @ValueSource(booleans = {false, true})
+    void chooseForAddWriteWithSecondaryAndReadOnlyIndexes(boolean recovery) {
+        createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+        createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+
+        HybridTimestamp beginTsBeforeStoppingIndex = clock.now();
+
+        createSimpleStoppingIndex(READ_ONLY_INDEX_NAME);
+
+        HybridTimestamp beginTsOnStoppingIndex = latestCatalogVersionActivationTs();
+
+        int pkIndexId = indexId(PK_INDEX_NAME);
+        int readOnlyIndexId = indexId(READ_ONLY_INDEX_NAME);
+
+        dropIndex(REGISTERED_INDEX_NAME);
+        dropIndex(BUILDING_INDEX_NAME);
+        removeIndex(READ_ONLY_INDEX_NAME);
+
+        if (recovery) {
+            recoverIndexChooser();
+        }
+
+        assertThat(chooseForAddWriteLatest(beginTsBeforeStoppingIndex), contains(pkIndexId, readOnlyIndexId));
+        assertThat(chooseForAddWriteLatest(beginTsOnStoppingIndex), contains(pkIndexId));
+        assertThat(chooseForAddWriteLatest(clock.now()), contains(pkIndexId));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void chooseForAddWriteCommittedForDroppedTable(boolean recovery) {
+        HybridTimestamp commitTsBeforeCreateIndexes = clock.now();
+
+        createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+        createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+        createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+        createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+
+        int pkIndexId = indexId(PK_INDEX_NAME);
+        int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+        int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+
+        int tableId = tableId(TABLE_NAME);
+
+        HybridTimestamp commitTsBeforeDropTable = clock.now();
+
+        dropTable();
+
+        if (recovery) {
+            recoverIndexChooser();
+        }
+
+        assertThat(
+                chooseForAddWriteCommittedLatest(tableId, commitTsBeforeCreateIndexes),
+                contains(pkIndexId, availableIndexId, stoppingIndexId)
+        );
+
+        assertThat(
+                chooseForAddWriteCommittedLatest(tableId, commitTsBeforeDropTable),
+                contains(pkIndexId, availableIndexId)
+        );
+
+        assertThat(chooseForAddWriteCommittedLatest(tableId, clock.now()), empty());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void chooseForAddWriteForDroppedTable(boolean recovery) {
+        HybridTimestamp beginTsBeforeCreateIndexes = clock.now();
+
+        createSimpleRegisteredIndex(REGISTERED_INDEX_NAME);
+        createSimpleBuildingIndex(BUILDING_INDEX_NAME);
+        createSimpleAvailableIndex(AVAILABLE_INDEX_NAME);
+        createSimpleStoppingIndex(STOPPING_INDEX_NAME);
+
+        int pkIndexId = indexId(PK_INDEX_NAME);
+        int availableIndexId = indexId(AVAILABLE_INDEX_NAME);
+        int stoppingIndexId = indexId(STOPPING_INDEX_NAME);
+
+        int tableId = tableId(TABLE_NAME);
+
+        HybridTimestamp beginTsBeforeDropTable = clock.now();
+
+        dropTable();
+
+        if (recovery) {
+            recoverIndexChooser();
+        }
+
+        assertThat(
+                chooseForAddWriteLatest(tableId, beginTsBeforeCreateIndexes),
+                contains(pkIndexId, availableIndexId, stoppingIndexId)
+        );
+
+        assertThat(
+                chooseForAddWriteLatest(tableId, beginTsBeforeDropTable),
+                contains(pkIndexId, availableIndexId)
+        );
+
+        assertThat(chooseForAddWriteLatest(tableId, clock.now()), empty());
+    }
+
+    private List<Integer> chooseForAddWriteCommittedLatest(int tableId, HybridTimestamp commitTs) {
+        return indexChooser.chooseForAddWriteCommitted(latestCatalogVersion(), tableId, commitTs);
+    }
+
+    private List<Integer> chooseForAddWriteCommittedLatest(HybridTimestamp commitTs) {
+        return chooseForAddWriteCommittedLatest(tableId(TABLE_NAME), commitTs);
+    }
+
+    private List<Integer> chooseForAddWriteCommittedLatest() {
+        return chooseForAddWriteCommittedLatest(HybridTimestamp.MAX_VALUE);
+    }
+
+    private List<Integer> chooseForAddWriteLatest(int tableId, HybridTimestamp beginTs) {
+        return indexChooser.chooseForAddWrite(latestCatalogVersion(), tableId, beginTs);
+    }
+
+    private List<Integer> chooseForAddWriteLatest(HybridTimestamp beginTs) {
+        return chooseForAddWriteLatest(tableId(TABLE_NAME), beginTs);
+    }
+
+    private List<Integer> chooseForAddWriteLatest() {
+        return chooseForAddWriteLatest(HybridTimestamp.MAX_VALUE);
+    }
+
+    private void createSimpleRegisteredIndex(String indexName) {
+        createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
+    }
+
+    private void createSimpleBuildingIndex(String indexName) {
+        createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
+        startBuildingIndex(catalogManager, indexId(indexName));
+    }
+
+    private void createSimpleAvailableIndex(String indexName) {
+        createSimpleBuildingIndex(indexName);
+        makeIndexAvailable(catalogManager, indexId(indexName));
+    }
+
+    private void createSimpleStoppingIndex(String indexName) {
+        createSimpleAvailableIndex(indexName);
+        dropIndex(indexName);
+    }
+
+    private void removeIndex(String indexName) {
+        TableTestUtils.removeIndex(catalogManager, indexName);
+    }
+
+    private void dropIndex(String indexName) {
+        TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
+    }
+
+    private int latestCatalogVersion() {
+        return catalogManager.latestCatalogVersion();
+    }
+
+    private HybridTimestamp latestCatalogVersionActivationTs() {
+        int catalogVersion = catalogManager.latestCatalogVersion();
+
+        Catalog catalog = catalogManager.catalog(catalogVersion);
+
+        assertNotNull(catalog, "catalogVersion=" + catalogVersion);
+
+        return hybridTimestamp(catalog.time());
+    }
+
+    private int tableId(String tableName) {
+        return getTableIdStrict(catalogManager, tableName, clock.nowLong());
+    }
+
+    private int indexId(String indexName) {
+        return getIndexIdStrict(catalogManager, indexName, clock.nowLong());
+    }
+
+    private void recoverIndexChooser() {
+        indexChooser.close();
+        indexChooser = new FullStateTransferIndexChooser(catalogManager);
+        indexChooser.start();
+    }
+
+    private void dropTable() {
+        TableTestUtils.dropTable(catalogManager, DEFAULT_SCHEMA_NAME, TABLE_NAME);
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
index 64c6f40025..0d70af7f4e 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
@@ -18,16 +18,21 @@
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
 import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
+import static org.apache.ignite.internal.tx.TransactionIds.transactionId;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -41,12 +46,11 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.junit.jupiter.api.Test;
 
-/**
- * For {@link PartitionAccessImpl} testing.
- */
+/** For {@link PartitionAccessImpl} testing. */
 public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
     private static final int TABLE_ID = 1;
 
@@ -60,14 +64,7 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
         MvPartitionStorage mvPartitionStorage = createMvPartition(mvTableStorage, TEST_PARTITION_ID);
         TxStateStorage txStateStorage = txStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION_ID);
 
-        PartitionAccess partitionAccess = new PartitionAccessImpl(
-                testPartitionKey(),
-                mvTableStorage,
-                txStateTableStorage,
-                mock(MvGc.class),
-                mock(IndexUpdateHandler.class),
-                mock(GcUpdateHandler.class)
-        );
+        PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, txStateTableStorage);
 
         assertEquals(0, partitionAccess.minLastAppliedIndex());
         assertEquals(0, partitionAccess.maxLastAppliedIndex());
@@ -106,14 +103,7 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
         MvPartitionStorage mvPartitionStorage = createMvPartition(mvTableStorage, TEST_PARTITION_ID);
         TxStateStorage txStateStorage = txStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION_ID);
 
-        PartitionAccess partitionAccess = new PartitionAccessImpl(
-                testPartitionKey(),
-                mvTableStorage,
-                txStateTableStorage,
-                mock(MvGc.class),
-                mock(IndexUpdateHandler.class),
-                mock(GcUpdateHandler.class)
-        );
+        PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, txStateTableStorage);
 
         assertEquals(0, partitionAccess.minLastAppliedTerm());
         assertEquals(0, partitionAccess.maxLastAppliedTerm());
@@ -148,36 +138,41 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
 
         IndexUpdateHandler indexUpdateHandler = mock(IndexUpdateHandler.class);
 
-        PartitionAccess partitionAccess = new PartitionAccessImpl(
-                testPartitionKey(),
-                mvTableStorage,
-                new TestTxStateTableStorage(),
-                mock(MvGc.class),
-                indexUpdateHandler,
-                mock(GcUpdateHandler.class)
-        );
+        FullStateTransferIndexChooser fullStateTransferIndexChooser = mock(FullStateTransferIndexChooser.class);
+
+        PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, indexUpdateHandler, fullStateTransferIndexChooser);
+
+        List<Integer> indexIds = List.of(1);
+
+        when(fullStateTransferIndexChooser.chooseForAddWrite(anyInt(), anyInt(), any())).thenReturn(indexIds);
 
         RowId rowId = new RowId(TEST_PARTITION_ID);
         BinaryRow binaryRow = mock(BinaryRow.class);
-        UUID txId = UUID.randomUUID();
+        UUID txId = transactionId(hybridTimestamp(System.currentTimeMillis()), 1);
         int commitTableId = 999;
+        int snapshotCatalogVersion = 666;
+        HybridTimestamp beginTs = beginTimestamp(txId);
+
+        partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID, snapshotCatalogVersion);
 
-        partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID);
+        verify(mvPartitionStorage).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
 
-        verify(mvPartitionStorage, times(1)).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
+        verify(fullStateTransferIndexChooser).chooseForAddWrite(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(beginTs));
 
-        verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+        verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
 
         // Let's check with a null binaryRow.
         binaryRow = null;
 
-        reset(mvPartitionStorage, indexUpdateHandler);
+        clearInvocations(mvPartitionStorage, indexUpdateHandler, fullStateTransferIndexChooser);
 
-        partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID);
+        partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, TEST_PARTITION_ID, snapshotCatalogVersion);
 
-        verify(mvPartitionStorage, times(1)).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
+        verify(mvPartitionStorage).addWrite(eq(rowId), eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
 
-        verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+        verify(fullStateTransferIndexChooser).chooseForAddWrite(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(beginTs));
+
+        verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
     }
 
     @Test
@@ -188,34 +183,39 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
 
         IndexUpdateHandler indexUpdateHandler = mock(IndexUpdateHandler.class);
 
-        PartitionAccess partitionAccess = new PartitionAccessImpl(
-                testPartitionKey(),
-                mvTableStorage,
-                new TestTxStateTableStorage(),
-                mock(MvGc.class),
-                indexUpdateHandler,
-                mock(GcUpdateHandler.class)
-        );
+        FullStateTransferIndexChooser fullStateTransferIndexChooser = mock(FullStateTransferIndexChooser.class);
+
+        PartitionAccess partitionAccess = createPartitionAccessImpl(mvTableStorage, indexUpdateHandler, fullStateTransferIndexChooser);
+
+        List<Integer> indexIds = List.of(1);
+
+        when(fullStateTransferIndexChooser.chooseForAddWriteCommitted(anyInt(), anyInt(), any())).thenReturn(indexIds);
 
         RowId rowId = new RowId(TEST_PARTITION_ID);
         BinaryRow binaryRow = mock(BinaryRow.class);
+        HybridTimestamp commitTimestamp = HybridTimestamp.MIN_VALUE.addPhysicalTime(100500);
+        int snapshotCatalogVersion = 666;
 
-        partitionAccess.addWriteCommitted(rowId, binaryRow, HybridTimestamp.MAX_VALUE);
+        partitionAccess.addWriteCommitted(rowId, binaryRow, commitTimestamp, snapshotCatalogVersion);
 
-        verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId), eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
+        verify(mvPartitionStorage).addWriteCommitted(eq(rowId), eq(binaryRow), eq(commitTimestamp));
 
-        verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+        verify(fullStateTransferIndexChooser).chooseForAddWriteCommitted(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(commitTimestamp));
+
+        verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
 
         // Let's check with a null binaryRow.
         binaryRow = null;
 
-        reset(mvPartitionStorage, indexUpdateHandler);
+        clearInvocations(mvPartitionStorage, indexUpdateHandler, fullStateTransferIndexChooser);
+
+        partitionAccess.addWriteCommitted(rowId, binaryRow, commitTimestamp, snapshotCatalogVersion);
 
-        partitionAccess.addWriteCommitted(rowId, binaryRow, HybridTimestamp.MAX_VALUE);
+        verify(mvPartitionStorage).addWriteCommitted(eq(rowId), eq(binaryRow), eq(commitTimestamp));
 
-        verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId), eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
+        verify(fullStateTransferIndexChooser).chooseForAddWriteCommitted(eq(snapshotCatalogVersion), eq(TABLE_ID), eq(commitTimestamp));
 
-        verify(indexUpdateHandler, times(1)).addToIndexes(eq(binaryRow), eq(rowId), isNull());
+        verify(indexUpdateHandler).addToIndexes(eq(binaryRow), eq(rowId), eq(indexIds));
     }
 
     private static MvPartitionStorage createMvPartition(MvTableStorage tableStorage, int partitionId) {
@@ -225,4 +225,35 @@ public class PartitionAccessImplTest extends BaseIgniteAbstractTest {
 
         return createMvPartitionFuture.join();
     }
+
+    private static PartitionAccessImpl createPartitionAccessImpl(
+            MvTableStorage mvTableStorage,
+            TxStateTableStorage txStateTableStorage
+    ) {
+        return new PartitionAccessImpl(
+                testPartitionKey(),
+                mvTableStorage,
+                txStateTableStorage,
+                mock(MvGc.class),
+                mock(IndexUpdateHandler.class),
+                mock(GcUpdateHandler.class),
+                mock(FullStateTransferIndexChooser.class)
+        );
+    }
+
+    private static PartitionAccessImpl createPartitionAccessImpl(
+            MvTableStorage mvTableStorage,
+            IndexUpdateHandler indexUpdateHandler,
+            FullStateTransferIndexChooser fullStateTransferIndexChooser
+    ) {
+        return new PartitionAccessImpl(
+                testPartitionKey(),
+                mvTableStorage,
+                new TestTxStateTableStorage(),
+                mock(MvGc.class),
+                indexUpdateHandler,
+                mock(GcUpdateHandler.class),
+                fullStateTransferIndexChooser
+        );
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 89a534e11c..24f5fd881b 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
@@ -92,6 +93,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -130,7 +132,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
             new Column[]{new Column("value", NativeTypes.stringOf(256), false)}
     );
 
-    private static final HybridClock HYBRID_CLOCK = new HybridClockImpl();
+    private static final HybridClock CLOCK = new HybridClockImpl();
 
     private static final TableMessagesFactory TABLE_MSG_FACTORY = new TableMessagesFactory();
 
@@ -314,7 +316,8 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
                         incomingTxStateTableStorage,
                         mvGc,
                         mock(IndexUpdateHandler.class),
-                        mock(GcUpdateHandler.class)
+                        mock(GcUpdateHandler.class),
+                        mock(FullStateTransferIndexChooser.class)
                 )),
                 catalogService,
                 mock(SnapshotMeta.class),
@@ -336,10 +339,10 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
             for (int i = 0; i < rowIds.size(); i++) {
                 if (i % 2 == 0) {
                     // Writes committed version.
-                    storage.addWriteCommitted(rowIds.get(i), createRow("k" + i, "v" + i), HYBRID_CLOCK.now());
+                    storage.addWriteCommitted(rowIds.get(i), createRow("k" + i, "v" + i), CLOCK.now());
                 } else {
                     // Writes an intent to write (uncommitted version).
-                    storage.addWrite(rowIds.get(i), createRow("k" + i, "v" + i), UUID.randomUUID(), 999, TEST_PARTITION);
+                    storage.addWrite(rowIds.get(i), createRow("k" + i, "v" + i), generateTxId(), 999, TEST_PARTITION);
                 }
             }
 
@@ -362,7 +365,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
         for (int i = 0; i < txIds.size(); i++) {
             TxState txState = i % 2 == 0 ? COMMITTED : ABORTED;
 
-            storage.put(txIds.get(i), new TxMeta(txState, HYBRID_CLOCK.now()));
+            storage.put(txIds.get(i), new TxMeta(txState, CLOCK.now()));
         }
 
         storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
@@ -564,7 +567,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
 
             return null;
         }).when(partitionSnapshotStorage.partition())
-                .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt());
+                .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt(), anyInt());
 
         // Let's start rebalancing.
         SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom(
@@ -608,7 +611,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
 
         // Let's add an error on the rebalance.
         doThrow(StorageException.class).when(partitionSnapshotStorage.partition())
-                .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt());
+                .addWrite(any(RowId.class), any(BinaryRow.class), any(UUID.class), anyInt(), anyInt(), anyInt());
 
         // Let's start rebalancing.
         SnapshotCopier snapshotCopier = partitionSnapshotStorage.startToCopyFrom(
@@ -658,10 +661,10 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
 
     private static List<UUID> generateTxIds() {
         return List.of(
-                UUID.randomUUID(),
-                UUID.randomUUID(),
-                UUID.randomUUID(),
-                UUID.randomUUID()
+                generateTxId(),
+                generateTxId(),
+                generateTxId(),
+                generateTxId()
         );
     }
 
@@ -731,4 +734,8 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
         assertFalse(incomingMvPartitionStorage.scan(HybridTimestamp.MAX_VALUE).hasNext());
         assertFalse(incomingTxStatePartitionStorage.scan().hasNext());
     }
+
+    private static UUID generateTxId() {
+        return TransactionIds.transactionId(CLOCK.now(), 1);
+    }
 }
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index bc0e92bb55..dc4588069d 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -50,6 +51,9 @@ public class TableTestUtils {
     /** Index name. */
     public static final String INDEX_NAME = "TEST_INDEX";
 
+    /** Name of the primary key index for {@link #TABLE_NAME}. */
+    public static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
     /** Column name. */
     public static final String COLUMN_NAME = "TEST_COLUMN";