You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/28 05:12:09 UTC

(ignite-3) branch main updated: IGNITE-21606 Add tuple upgrade during filtering index scans (#3299)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e4e11c8b19 IGNITE-21606 Add tuple upgrade during filtering index scans (#3299)
e4e11c8b19 is described below

commit e4e11c8b190c28f8d4c2b4231150fc7bf4c0f503
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Feb 28 08:12:04 2024 +0300

    IGNITE-21606 Add tuple upgrade during filtering index scans (#3299)
---
 .../internal/index/ItBuildIndexOneNodeTest.java    |   3 -
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   7 +-
 .../internal/table/distributed/TableManager.java   |   3 +-
 .../replicator/PartitionReplicaListener.java       | 113 +++++++++++++++------
 .../PartitionReplicaListenerIndexLockingTest.java  |   4 +-
 .../replication/PartitionReplicaListenerTest.java  |   5 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  11 +-
 .../table/impl/DummyInternalTableImpl.java         |   4 +-
 .../table/impl/DummySchemaManagerImpl.java         |  59 ++++-------
 9 files changed, 128 insertions(+), 81 deletions(-)

diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index 8cf4826abd..9f134aed0c 100644
--- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -47,7 +47,6 @@ import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /** Integration test for testing the building of an index in a single node cluster. */
@@ -241,7 +240,6 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest {
                 .check();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606")
     @Test
     void testBuildingIndexWithUpdateSchema() throws Exception {
         createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
@@ -273,7 +271,6 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest {
                 .check();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606")
     @Test
     void testBuildingIndexWithUpdateSchemaAfterCreateIndex() throws Exception {
         createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index c383cc866c..2ea2fe0d74 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.TxAbstractTest;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -160,7 +161,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes
                     CatalogService catalogService,
                     PlacementDriver placementDriver,
                     ClusterNodeResolver clusterNodeResolver,
-                    RemotelyTriggeredResourceRegistry resourcesRegistry
+                    RemotelyTriggeredResourceRegistry resourcesRegistry,
+                    SchemaRegistry schemaRegistry
             ) {
                 return new PartitionReplicaListener(
                         mvDataStorage,
@@ -184,7 +186,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes
                         catalogService,
                         placementDriver,
                         clusterNodeResolver,
-                        resourcesRegistry
+                        resourcesRegistry,
+                        schemaRegistry
                 ) {
                     @Override
                     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4ec8d85338..8af5a6dd37 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -990,7 +990,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 catalogService,
                 executorInclinedPlacementDriver,
                 clusterService.topologyService(),
-                remotelyTriggeredResourceRegistry
+                remotelyTriggeredResourceRegistry,
+                schemaManager.schemaRegistry(tableId)
         );
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 02bd232158..d675397819 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -30,6 +30,7 @@ import static org.apache.ignite.internal.table.distributed.replicator.RemoteReso
 import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.beginRwTxTs;
 import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
 import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
@@ -75,6 +76,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
@@ -106,6 +108,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowUpgrader;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.NullBinaryRow;
@@ -287,6 +290,8 @@ public class PartitionReplicaListener implements ReplicaListener {
     /** Listener for {@link CatalogEvent#INDEX_BUILDING}. */
     private final EventListener<CatalogEventParameters> indexBuildingCatalogEventListener = this::onIndexBuilding;
 
+    private final SchemaRegistry schemaRegistry;
+
     /**
      * The constructor.
      *
@@ -332,7 +337,8 @@ public class PartitionReplicaListener implements ReplicaListener {
             CatalogService catalogService,
             PlacementDriver placementDriver,
             ClusterNodeResolver clusterNodeResolver,
-            RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry
+            RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry,
+            SchemaRegistry schemaRegistry
     ) {
         this.mvDataStorage = mvDataStorage;
         this.raftClient = raftClient;
@@ -353,6 +359,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         this.placementDriver = placementDriver;
         this.clusterNodeResolver = clusterNodeResolver;
         this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
+        this.schemaRegistry = schemaRegistry;
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
 
@@ -1091,9 +1098,6 @@ public class PartitionReplicaListener implements ReplicaListener {
     ) {
         IndexStorage indexStorage = schemaAwareIndexStorage.storage();
 
-        int batchCount = request.batchSize();
-        HybridTimestamp timestamp = request.readTimestamp();
-
         FullyQualifiedResourceId cursorId = cursorId(request.transactionId(), request.scanId());
 
         BinaryTuple key = request.exactKey().asBinaryTuple();
@@ -1104,12 +1108,22 @@ public class PartitionReplicaListener implements ReplicaListener {
                 () -> new CursorResource(indexStorage.get(key))
         ).cursor();
 
+        Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new IndexRowImpl(key, rowId));
+
+        int batchCount = request.batchSize();
+
         var result = new ArrayList<BinaryRow>(batchCount);
 
-        Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new IndexRowImpl(key, rowId));
+        HybridTimestamp readTimestamp = request.readTimestamp();
 
-        return continueReadOnlyIndexScan(schemaAwareIndexStorage, indexRowCursor, timestamp, batchCount, result)
-                .thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
+        return continueReadOnlyIndexScan(
+                schemaAwareIndexStorage,
+                indexRowCursor,
+                readTimestamp,
+                batchCount,
+                result,
+                tableVersionByTs(readTimestamp)
+        ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
     }
 
     private CompletableFuture<List<BinaryRow>> lookupIndex(
@@ -1161,7 +1175,6 @@ public class PartitionReplicaListener implements ReplicaListener {
         var indexStorage = (SortedIndexStorage) schemaAwareIndexStorage.storage();
 
         UUID txId = request.transactionId();
-        int batchCount = request.batchSize();
 
         FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
 
@@ -1214,11 +1227,20 @@ public class PartitionReplicaListener implements ReplicaListener {
 
                         SortedIndexLocker indexLocker = (SortedIndexLocker) indexesLockers.get().get(indexId);
 
+                        int batchCount = request.batchSize();
+
                         var result = new ArrayList<BinaryRow>(batchCount);
 
-                        return continueIndexScan(txId, schemaAwareIndexStorage, indexLocker, cursor, batchCount, result,
-                                isUpperBoundAchieved)
-                                .thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
+                        return continueIndexScan(
+                                txId,
+                                schemaAwareIndexStorage,
+                                indexLocker,
+                                cursor,
+                                batchCount,
+                                result,
+                                isUpperBoundAchieved,
+                                tableVersionByTs(beginTimestamp(txId))
+                        ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
                     });
         });
     }
@@ -1236,11 +1258,7 @@ public class PartitionReplicaListener implements ReplicaListener {
     ) {
         var indexStorage = (SortedIndexStorage) schemaAwareIndexStorage.storage();
 
-        UUID txId = request.transactionId();
-        int batchCount = request.batchSize();
-        HybridTimestamp timestamp = request.readTimestamp();
-
-        FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
+        FullyQualifiedResourceId cursorId = cursorId(request.transactionId(), request.scanId());
 
         BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
         BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
@@ -1257,18 +1275,29 @@ public class PartitionReplicaListener implements ReplicaListener {
                         flags
                 ))).cursor();
 
+        int batchCount = request.batchSize();
+
         var result = new ArrayList<BinaryRow>(batchCount);
 
-        return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchCount, result)
-                .thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
+        HybridTimestamp readTimestamp = request.readTimestamp();
+
+        return continueReadOnlyIndexScan(
+                schemaAwareIndexStorage,
+                cursor,
+                readTimestamp,
+                batchCount,
+                result,
+                tableVersionByTs(readTimestamp)
+        ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
     }
 
     private CompletableFuture<Void> continueReadOnlyIndexScan(
             TableSchemaAwareIndexStorage schemaAwareIndexStorage,
             Cursor<IndexRow> cursor,
-            HybridTimestamp timestamp,
+            HybridTimestamp readTimestamp,
             int batchSize,
-            List<BinaryRow> result
+            List<BinaryRow> result,
+            int tableVersion
     ) {
         if (result.size() >= batchSize || !cursor.hasNext()) {
             return nullCompletedFuture();
@@ -1278,14 +1307,14 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         RowId rowId = indexRow.rowId();
 
-        return resolvePlainReadResult(rowId, null, timestamp).thenComposeAsync(resolvedReadResult -> {
-            if (resolvedReadResult != null
-                    && resolvedReadResult.binaryRow() != null
-                    && indexRowMatches(indexRow, resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) {
-                result.add(resolvedReadResult.binaryRow());
+        return resolvePlainReadResult(rowId, null, readTimestamp).thenComposeAsync(resolvedReadResult -> {
+            BinaryRow binaryRow = upgrage(binaryRow(resolvedReadResult), tableVersion);
+
+            if (binaryRow != null && indexRowMatches(indexRow, binaryRow, schemaAwareIndexStorage)) {
+                result.add(binaryRow);
             }
 
-            return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result);
+            return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp, batchSize, result, tableVersion);
         }, scanRequestExecutor);
     }
 
@@ -1299,6 +1328,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param batchSize Batch size.
      * @param result Result collection.
      * @param isUpperBoundAchieved Function to stop on upper bound.
+     * @param tableVersion Table schema version at begin timestamp.
      * @return Future.
      */
     private CompletableFuture<Void> continueIndexScan(
@@ -1308,7 +1338,8 @@ public class PartitionReplicaListener implements ReplicaListener {
             Cursor<IndexRow> indexCursor,
             int batchSize,
             List<BinaryRow> result,
-            Predicate<IndexRow> isUpperBoundAchieved
+            Predicate<IndexRow> isUpperBoundAchieved,
+            int tableVersion
     ) {
         if (result.size() == batchSize) { // Batch is full, exit loop.
             return nullCompletedFuture();
@@ -1325,9 +1356,9 @@ public class PartitionReplicaListener implements ReplicaListener {
                     return lockManager.acquire(txId, new LockKey(tableId(), rowId), LockMode.S)
                             .thenComposeAsync(rowLock -> { // Table row S lock
                                 return resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> {
-                                    if (resolvedReadResult != null
-                                            && resolvedReadResult.binaryRow() != null
-                                            && indexRowMatches(currentRow, resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) {
+                                    BinaryRow binaryRow = upgrage(binaryRow(resolvedReadResult), tableVersion);
+
+                                    if (binaryRow != null && indexRowMatches(currentRow, binaryRow, schemaAwareIndexStorage)) {
                                         result.add(resolvedReadResult.binaryRow());
                                     }
 
@@ -1339,7 +1370,8 @@ public class PartitionReplicaListener implements ReplicaListener {
                                             indexCursor,
                                             batchSize,
                                             result,
-                                            isUpperBoundAchieved
+                                            isUpperBoundAchieved,
+                                            tableVersion
                                     );
                                 });
                             }, scanRequestExecutor);
@@ -1355,7 +1387,6 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @return {@code true} if index row matches the binary row, {@code false} otherwise.
      */
     private static boolean indexRowMatches(IndexRow indexRow, BinaryRow binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
-        // TODO: IGNITE-21606 It is necessary to upgrade the tuple to the required schema version
         BinaryTuple actualIndexRow = schemaAwareIndexStorage.indexRowResolver().extractColumns(binaryRow);
 
         return indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer());
@@ -3814,4 +3845,22 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         return hybridTimestamp(catalog.time());
     }
+
+    private int tableVersionByTs(HybridTimestamp ts) {
+        int activeCatalogVersion = catalogService.activeCatalogVersion(ts.longValue());
+
+        CatalogTableDescriptor table = catalogService.table(tableId(), activeCatalogVersion);
+
+        assert table != null : "tableId=" + tableId() + ", catalogVersion=" + activeCatalogVersion;
+
+        return table.tableVersion();
+    }
+
+    private static @Nullable BinaryRow binaryRow(@Nullable TimedBinaryRow timedBinaryRow) {
+        return timedBinaryRow == null ? null : timedBinaryRow.binaryRow();
+    }
+
+    private @Nullable BinaryRow upgrage(@Nullable BinaryRow source, int targetSchemaVersion) {
+        return source == null ? null : new BinaryRowUpgrader(schemaRegistry, targetSchemaVersion).upgrade(source);
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 09ab21b745..7d3cfcb7b1 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -214,6 +214,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
         when(tableDescriptor.tableVersion()).thenReturn(schemaDescriptor.version());
 
         when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+        when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor);
 
         CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class);
         when(indexDescriptor.id()).thenReturn(PK_INDEX_ID);
@@ -256,7 +257,8 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
                 catalogService,
                 new TestPlacementDriver(localNode),
                 mock(ClusterNodeResolver.class),
-                new RemotelyTriggeredResourceRegistry()
+                new RemotelyTriggeredResourceRegistry(),
+                schemaManager
         );
 
         kvMarshaller = new ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, Integer.class);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 0f09e614e1..8ecf0ac46f 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -173,6 +173,7 @@ import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
@@ -449,6 +450,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         when(validationSchemasSource.waitForSchemaAvailability(anyInt(), anyInt())).thenReturn(nullCompletedFuture());
 
         lenient().when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+        lenient().when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor);
 
         int pkIndexId = 1;
         int sortedIndexId = 2;
@@ -577,7 +579,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 catalogService,
                 placementDriver,
                 new SingleClusterNodeResolver(localNode),
-                new RemotelyTriggeredResourceRegistry()
+                new RemotelyTriggeredResourceRegistry(),
+                new DummySchemaManagerImpl(schemaDescriptor, schemaDescriptorVersion2)
         );
 
         kvMarshaller = marshallerFor(schemaDescriptor);
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e4c6da5eed..fb02f31f39 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -473,6 +474,7 @@ public class ItTxTestCluster {
         when(tableDescriptor.tableVersion()).thenReturn(SCHEMA_VERSION);
 
         lenient().when(catalogService.table(eq(tableId), anyLong())).thenReturn(tableDescriptor);
+        lenient().when(catalogService.table(eq(tableId), anyInt())).thenReturn(tableDescriptor);
 
         List<Set<Assignment>> calculatedAssignments = AffinityUtils.calculateAssignments(
                 cluster.stream().map(node -> node.topologyService().localMember().name()).collect(toList()),
@@ -615,7 +617,8 @@ public class ItTxTestCluster {
                                         catalogService,
                                         placementDriver,
                                         nodeResolver,
-                                        cursorRegistries.get(assignment)
+                                        cursorRegistries.get(assignment),
+                                        schemaManager
                                 );
 
                                 replicaManagers.get(assignment).startReplica(
@@ -714,7 +717,8 @@ public class ItTxTestCluster {
             CatalogService catalogService,
             PlacementDriver placementDriver,
             ClusterNodeResolver clusterNodeResolver,
-            RemotelyTriggeredResourceRegistry resourcesRegistry
+            RemotelyTriggeredResourceRegistry resourcesRegistry,
+            SchemaRegistry schemaRegistry
     ) {
         return new PartitionReplicaListener(
                 mvDataStorage,
@@ -738,7 +742,8 @@ public class ItTxTestCluster {
                 catalogService,
                 placementDriver,
                 clusterNodeResolver,
-                resourcesRegistry
+                resourcesRegistry,
+                schemaRegistry
         );
     }
 
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 1ae5e6cc71..472ed405f1 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -364,6 +364,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
         CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
 
         lenient().when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+        lenient().when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor);
         lenient().when(tableDescriptor.tableVersion()).thenReturn(1);
 
         CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class);
@@ -393,7 +394,8 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 catalogService,
                 new TestPlacementDriver(LOCAL_NODE),
                 mock(ClusterNodeResolver.class),
-                resourcesRegistry
+                resourcesRegistry,
+                schemaManager
         );
 
         partitionListener = new PartitionListener(
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index da412a24a6..4717b88816 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -18,86 +18,71 @@
 package org.apache.ignite.internal.table.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.row.Row;
 
-/**
- * Dummy schema manager for tests.
- */
+/** Dummy schema manager for tests. */
 public class DummySchemaManagerImpl implements SchemaRegistry {
-    /** Schema. */
-    private final SchemaDescriptor schema;
-
-    /**
-     * Constructor.
-     *
-     * @param schema Schema descriptor.
-     */
-    public DummySchemaManagerImpl(SchemaDescriptor schema) {
-        assert schema != null;
-
-        this.schema = schema;
+    private final NavigableMap<Integer, SchemaDescriptor> schemaByVersion;
+
+    /** Constructor. */
+    public DummySchemaManagerImpl(SchemaDescriptor... schemas) {
+        assert !nullOrEmpty(schemas);
+
+        schemaByVersion = Stream.of(schemas).collect(toMap(SchemaDescriptor::version, identity(), (o, o2) -> o2, TreeMap::new));
     }
 
-    /** {@inheritDoc} */
     @Override
     public SchemaDescriptor lastKnownSchema() {
-        return schema;
+        return schemaByVersion.lastEntry().getValue();
     }
 
-    /** {@inheritDoc} */
     @Override
     public SchemaDescriptor schema(int version) {
-        assert version >= 0;
-        assert schema.version() == version;
+        assert schemaByVersion.containsKey(version) : version;
 
-        return schema;
+        return schemaByVersion.get(version);
     }
 
     @Override
     public CompletableFuture<SchemaDescriptor> schemaAsync(int version) {
-        assert version >= 0;
-        assert schema.version() == version;
-
-        return completedFuture(schema);
+        return completedFuture(schema(version));
     }
 
-    /** {@inheritDoc} */
     @Override
     public int lastKnownSchemaVersion() {
-        return schema.version();
+        return lastKnownSchema().version();
     }
 
-    /** {@inheritDoc} */
     @Override
     public Row resolve(BinaryRow row, SchemaDescriptor desc) {
         return Row.wrapBinaryRow(desc, row);
     }
 
-    /** {@inheritDoc} */
     @Override
     public Row resolve(BinaryRow row, int targetSchemaVersion) {
-        assert row.schemaVersion() == schema.version() || row.schemaVersion() == 0;
-        assert targetSchemaVersion == row.schemaVersion();
+        assert targetSchemaVersion == row.schemaVersion() : "row=" + row.schemaVersion() + ", target=" + targetSchemaVersion;
 
-        return Row.wrapBinaryRow(schema, row);
+        return Row.wrapBinaryRow(schema(targetSchemaVersion), row);
     }
 
     @Override
     public List<Row> resolve(Collection<BinaryRow> rows, int targetSchemaVersion) {
-        for (BinaryRow row : rows) {
-            assert row == null || row.schemaVersion() == targetSchemaVersion;
-        }
-
         return rows.stream()
-                .map(row -> row == null ? null : Row.wrapBinaryRow(schema(row.schemaVersion()), row))
+                .map(row -> row == null ? null : resolve(row, targetSchemaVersion))
                 .collect(toList());
     }