You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/09/13 06:56:34 UTC

[ignite-3] branch main updated: IGNITE-20044 Validate schema eligibility on each read/write operation in an RW transaction (#2566)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d1ae2d1d6 IGNITE-20044 Validate schema eligibility on each read/write operation in an RW transaction (#2566)
0d1ae2d1d6 is described below

commit 0d1ae2d1d67724f43abfaa8af4a2f85b9e328656
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Sep 13 10:56:27 2023 +0400

    IGNITE-20044 Validate schema eligibility on each read/write operation in an RW transaction (#2566)
---
 .../Table/SchemaSynchronizationTest.cs             |   1 +
 .../org/apache/ignite/internal/SessionUtils.java   |   2 +-
 .../inmemory/ItRaftStorageVolatilityTest.java      |  55 +---
 .../runner/app/PlatformTestNodeRunner.java         | 216 ++++++++++++++-
 .../ignite/internal/runner/app/TableTestUtils.java |  62 +++++
 .../schemasync/ItSchemaSyncSingleNodeTest.java     | 215 +++++++++++++++
 .../ignite/internal/sql/api/ItCommonApiTest.java   |  40 +--
 .../ignite/internal/table/ItRoReadsTest.java       |  50 ++--
 .../org/apache/ignite/internal/app/IgniteImpl.java |  10 +
 .../table/distributed/TableIdRegistry.java         |  48 ++++
 .../RequestType.java => TableIdTranslator.java}    |  52 +---
 .../internal/table/distributed/TableManager.java   |  33 +++
 .../distributed/replicator/CatalogTables.java      |  42 +++
 .../replicator/CatalogTablesWithIdConversion.java  |  47 ++++
 .../RequestType.java => DirectCatalogTables.java}  |  54 ++--
 .../replicator/PartitionReplicaListener.java       | 298 +++++++++++++++------
 .../replicator/SchemaCompatValidator.java          |  24 +-
 .../distributed/replicator/action/RequestType.java |  40 ++-
 .../PartitionReplicaListenerIndexLockingTest.java  |   8 +
 .../replication/PartitionReplicaListenerTest.java  | 289 ++++++++++++++++----
 .../replicator/action/RequestTypeTest.java         |  60 +++++
 .../apache/ignite/distributed/ItTxTestCluster.java |  10 +
 .../replicator/action/RequestTypes.java            |  78 +++++-
 .../table/impl/DummyInternalTableImpl.java         |  11 +
 24 files changed, 1425 insertions(+), 320 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 99f13d0e64..ea7cf08de8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -337,6 +337,7 @@ public class SchemaSynchronizationTest : IgniteTestsBase
     }
 
     [Test]
+    [Ignore("https://issues.apache.org/jira/browse/IGNITE-20399")]
     public async Task TestSchemaUpdateWhileStreaming([Values(true, false)] bool insertNewColumn)
     {
         await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (KEY bigint PRIMARY KEY)");
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
index d0562a1373..ad99e6567d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
@@ -35,7 +35,7 @@ public class SessionUtils {
      *     be executed n an implicit transaction.
      */
     public static void executeUpdate(String sql, Session session, @Nullable Transaction transaction) {
-        try (ResultSet ignored = session.execute(transaction, sql)) {
+        try (ResultSet<?> ignored = session.execute(transaction, sql)) {
             // Do nothing, just adhere to the syntactic ceremony...
         }
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
index 5bd78aa357..e865b7573f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
@@ -20,9 +20,7 @@ package org.apache.ignite.internal.inmemory;
 import static ca.seinesoftware.hamcrest.path.PathMatcher.exists;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_PARTITION_COUNT;
-import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.everyItem;
@@ -36,19 +34,13 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.raft.configuration.EntryCountBudgetChange;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
-import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageChange;
 import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
 import org.junit.jupiter.api.Test;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -228,45 +220,24 @@ class ItRaftStorageVolatilityTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void logSpillsOutToDisk() {
-        node(0).nodeConfiguration().getConfiguration(RaftConfiguration.KEY).change(cfg -> {
-            cfg.changeVolatileRaft(change -> {
-                change.changeLogStorage(budgetChange -> budgetChange.convert(EntryCountBudgetChange.class).changeEntriesCountLimit(1));
-            });
-        });
-
         createTableWithMaxOneInMemoryEntryAllowed("PERSON");
 
         executeSql("INSERT INTO PERSON(ID, NAME) VALUES (1, 'JOHN')");
         executeSql("INSERT INTO PERSON(ID, NAME) VALUES (2, 'JANE')");
     }
 
+    @SuppressWarnings("resource")
     private void createTableWithMaxOneInMemoryEntryAllowed(String tableName) {
-        DistributionZoneManager distributionZoneManager = node(0).distributionZoneManager();
-
-        String zoneName = "zone1";
-
-        createZone(
-                distributionZoneManager,
-                zoneName,
-                1,
-                DEFAULT_PARTITION_COUNT,
-                dataStorageChange -> dataStorageChange.convert(VolatilePageMemoryDataStorageChange.class)
-        );
-
-        int zoneId = distributionZoneManager.getZoneId(zoneName);
-
-        TableDefinition tableDef = SchemaBuilders.tableBuilder("PUBLIC", tableName).columns(
-                SchemaBuilders.column("ID", ColumnType.INT32).build(),
-                SchemaBuilders.column("NAME", ColumnType.string()).asNullable(true).build()
-        ).withPrimaryKey("ID").build();
+        CompletableFuture<Void> configUpdateFuture = node(0).nodeConfiguration().getConfiguration(RaftConfiguration.KEY).change(cfg -> {
+            cfg.changeVolatileRaft(change -> {
+                change.changeLogStorage(budgetChange -> budgetChange.convert(EntryCountBudgetChange.class).changeEntriesCountLimit(1));
+            });
+        });
+        assertThat(configUpdateFuture, willCompleteSuccessfully());
 
-        assertThat(
-                ((TableManager) node(0).tables()).createTableAsync(
-                        tableName,
-                        DEFAULT_ZONE_NAME,
-                        tableChange -> SchemaConfigurationConverter.convert(tableDef, tableChange).changeZoneId(zoneId)
-                ),
-                CompletableFutureMatcher.willCompleteSuccessfully()
-        );
+        cluster.doInSession(0, session -> {
+            session.execute(null, "create zone zone1 engine aimem with partitions=1, replicas=1");
+            session.execute(null, "create table " + tableName + " (id int primary key, name varchar) with primary_zone='ZONE1'");
+        });
     }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index b5dc5c9222..331027c3b4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -18,10 +18,29 @@
 package org.apache.ignite.internal.runner.app;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
+import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.runner.app.TableTestUtils.createTable;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.escapeWindowsPath;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.getResourcePath;
+import static org.apache.ignite.sql.ColumnType.BITMASK;
+import static org.apache.ignite.sql.ColumnType.BOOLEAN;
+import static org.apache.ignite.sql.ColumnType.BYTE_ARRAY;
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+import static org.apache.ignite.sql.ColumnType.TIMESTAMP;
+import static org.apache.ignite.sql.ColumnType.UUID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import io.netty.util.ResourceLeakDetector;
 import java.io.IOException;
@@ -39,6 +58,7 @@ import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.client.proto.ColumnTypeConverter;
 import org.apache.ignite.internal.configuration.BasicAuthenticationProviderChange;
 import org.apache.ignite.internal.configuration.SecurityConfiguration;
@@ -258,16 +278,16 @@ public class PlatformTestNodeRunner {
     private static void createTables(Ignite node) {
         var keyCol = "key";
 
-        createZone(((IgniteImpl) node).distributionZoneManager(), ZONE_NAME, 10, 1);
+        IgniteImpl ignite = (IgniteImpl) node;
 
-        TableDefinition schTbl = SchemaBuilders.tableBuilder(SCHEMA_NAME, TABLE_NAME).columns(
-                SchemaBuilders.column(keyCol, ColumnType.INT64).build(),
-                SchemaBuilders.column("val", ColumnType.string()).asNullable(true).build()
-        ).withPrimaryKey(keyCol).build();
+        try (Session session = node.sql().createSession()) {
+            session.execute(null, "CREATE ZONE \"" + ZONE_NAME + "\" WITH partitions=10, replicas=1");
+        }
 
-        await(((TableManager) node.tables()).createTableAsync(schTbl.name(), ZONE_NAME, tblCh ->
-                SchemaConfigurationConverter.convert(schTbl, tblCh)
-        ));
+        try (Session session = node.sql().createSession()) {
+            session.execute(null, "CREATE TABLE " + TABLE_NAME + "(" + keyCol + " bigint PRIMARY KEY, val varchar) WITH primary_zone='"
+                    + ZONE_NAME + "'");
+        }
 
         int maxTimePrecision = TemporalColumnType.MAX_TIME_PRECISION;
 
@@ -298,6 +318,36 @@ public class PlatformTestNodeRunner {
                 SchemaConfigurationConverter.convert(schTblAll, tblCh)
         ));
 
+        createTable(
+                ignite.catalogManager(),
+                DEFAULT_SCHEMA_NAME,
+                ZONE_NAME,
+                TABLE_NAME_ALL_COLUMNS.toUpperCase(),
+                List.of(
+                        ColumnParams.builder().name(keyCol.toUpperCase()).type(INT64).build(),
+                        ColumnParams.builder().name("STR").type(STRING).nullable(true).build(),
+                        ColumnParams.builder().name("INT8").type(INT8).nullable(true).build(),
+                        ColumnParams.builder().name("INT16").type(INT16).nullable(true).build(),
+                        ColumnParams.builder().name("INT32").type(INT32).nullable(true).build(),
+                        ColumnParams.builder().name("INT64").type(INT64).nullable(true).build(),
+                        ColumnParams.builder().name("FLOAT").type(FLOAT).nullable(true).build(),
+                        ColumnParams.builder().name("DOUBLE").type(DOUBLE).nullable(true).build(),
+                        ColumnParams.builder().name("UUID").type(UUID).nullable(true).build(),
+                        ColumnParams.builder().name("DATE").type(DATE).nullable(true).build(),
+                        ColumnParams.builder().name("BITMASK").type(BITMASK).length(64).nullable(true).build(),
+                        ColumnParams.builder().name("TIME").type(TIME).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("TIME2").type(TIME).precision(2).nullable(true).build(),
+                        ColumnParams.builder().name("DATETIME").type(DATETIME).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("DATETIME2").type(DATETIME).precision(3).nullable(true).build(),
+                        ColumnParams.builder().name("TIMESTAMP").type(TIMESTAMP).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("TIMESTAMP2").type(TIMESTAMP).precision(4).nullable(true).build(),
+                        ColumnParams.builder().name("BLOB").type(BYTE_ARRAY).nullable(true).build(),
+                        ColumnParams.builder().name("DECIMAL").type(DECIMAL).precision(19).scale(3).nullable(true).build(),
+                        ColumnParams.builder().name("BOOLEAN").type(BOOLEAN).nullable(true).build()
+                ),
+                List.of(keyCol.toUpperCase())
+        );
+
         // TODO IGNITE-18431 remove extra table, use TABLE_NAME_ALL_COLUMNS for SQL tests.
         TableDefinition schTblAllSql = SchemaBuilders.tableBuilder(SCHEMA_NAME, TABLE_NAME_ALL_COLUMNS_SQL).columns(
                 SchemaBuilders.column(keyCol, ColumnType.INT64).build(),
@@ -325,6 +375,36 @@ public class PlatformTestNodeRunner {
                 SchemaConfigurationConverter.convert(schTblAllSql, tblCh)
         ));
 
+        // TODO IGNITE-18431 remove extra table, use TABLE_NAME_ALL_COLUMNS for SQL tests.
+        createTable(
+                ignite.catalogManager(),
+                DEFAULT_SCHEMA_NAME,
+                ZONE_NAME,
+                TABLE_NAME_ALL_COLUMNS_SQL.toUpperCase(),
+                List.of(
+                        ColumnParams.builder().name(keyCol.toUpperCase()).type(INT64).build(),
+                        ColumnParams.builder().name("STR").type(STRING).nullable(true).build(),
+                        ColumnParams.builder().name("INT8").type(INT8).nullable(true).build(),
+                        ColumnParams.builder().name("INT16").type(INT16).nullable(true).build(),
+                        ColumnParams.builder().name("INT32").type(INT32).nullable(true).build(),
+                        ColumnParams.builder().name("INT64").type(INT64).nullable(true).build(),
+                        ColumnParams.builder().name("FLOAT").type(FLOAT).nullable(true).build(),
+                        ColumnParams.builder().name("DOUBLE").type(DOUBLE).nullable(true).build(),
+                        ColumnParams.builder().name("UUID").type(UUID).nullable(true).build(),
+                        ColumnParams.builder().name("DATE").type(DATE).nullable(true).build(),
+                        ColumnParams.builder().name("TIME").type(TIME).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("TIME2").type(TIME).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("DATETIME").type(DATETIME).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("DATETIME2").type(DATETIME).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("TIMESTAMP").type(TIMESTAMP).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("TIMESTAMP2").type(TIMESTAMP).precision(maxTimePrecision).nullable(true).build(),
+                        ColumnParams.builder().name("BLOB").type(BYTE_ARRAY).nullable(true).build(),
+                        ColumnParams.builder().name("DECIMAL").type(DECIMAL).precision(19).scale(3).nullable(true).build(),
+                        ColumnParams.builder().name("BOOLEAN").type(BOOLEAN).nullable(true).build()
+                ),
+                List.of(keyCol.toUpperCase())
+        );
+
         createTwoColumnTable(node, ColumnType.INT8);
         createTwoColumnTable(node, ColumnType.BOOLEAN);
         createTwoColumnTable(node, ColumnType.INT16);
@@ -342,6 +422,108 @@ public class PlatformTestNodeRunner {
         createTwoColumnTable(node, ColumnType.number());
         createTwoColumnTable(node, ColumnType.blob());
         createTwoColumnTable(node, ColumnType.bitmaskOf(32));
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT8).build(),
+                ColumnParams.builder().name("VAL").type(INT8).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(BOOLEAN).build(),
+                ColumnParams.builder().name("VAL").type(BOOLEAN).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT16).build(),
+                ColumnParams.builder().name("VAL").type(INT16).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT32).build(),
+                ColumnParams.builder().name("VAL").type(INT32).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT64).build(),
+                ColumnParams.builder().name("VAL").type(INT64).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(FLOAT).build(),
+                ColumnParams.builder().name("VAL").type(FLOAT).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(DOUBLE).build(),
+                ColumnParams.builder().name("VAL").type(DOUBLE).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(org.apache.ignite.sql.ColumnType.UUID).build(),
+                ColumnParams.builder().name("VAL").type(org.apache.ignite.sql.ColumnType.UUID).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(DECIMAL).precision(19).scale(3).build(),
+                ColumnParams.builder().name("VAL").type(DECIMAL).precision(19).scale(3).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(STRING).build(),
+                ColumnParams.builder().name("VAL").type(STRING).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(DATE).build(),
+                ColumnParams.builder().name("VAL").type(DATE).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(DATETIME).precision(6).build(),
+                ColumnParams.builder().name("VAL").type(DATETIME).precision(6).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(TIME).precision(6).build(),
+                ColumnParams.builder().name("VAL").type(TIME).precision(6).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(TIMESTAMP).precision(6).build(),
+                ColumnParams.builder().name("VAL").type(TIMESTAMP).precision(6).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(NUMBER).precision(Integer.MAX_VALUE).build(),
+                ColumnParams.builder().name("VAL").type(NUMBER).precision(Integer.MAX_VALUE).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(BYTE_ARRAY).build(),
+                ColumnParams.builder().name("VAL").type(BYTE_ARRAY).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(BITMASK).length(32).build(),
+                ColumnParams.builder().name("VAL").type(BITMASK).length(32).nullable(true).build()
+        );
     }
 
     private static void createTwoColumnTable(Ignite node, ColumnType type) {
@@ -357,6 +539,22 @@ public class PlatformTestNodeRunner {
         ));
     }
 
+    private static void createTwoColumnTableInCatalog(IgniteImpl ignite, ColumnParams keyColumnParams, ColumnParams valueColumnParams) {
+        assertEquals(keyColumnParams.type(), valueColumnParams.type());
+
+        org.apache.ignite.sql.ColumnType typeForTableName = keyColumnParams.type();
+        String typeName = typeForTableName == BYTE_ARRAY ? "BYTES" : typeForTableName.name();
+
+        createTable(
+                ignite.catalogManager(),
+                DEFAULT_SCHEMA_NAME,
+                ZONE_NAME,
+                ("tbl_" + typeName).toUpperCase(),
+                List.of(keyColumnParams, valueColumnParams),
+                List.of(keyColumnParams.name())
+        );
+    }
+
     /**
      * Gets the thin client port.
      *
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableTestUtils.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableTestUtils.java
new file mode 100644
index 0000000000..f8add797f0
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableTestUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
+
+/**
+ * Utils to manage tables inside tests.
+ */
+// TODO: IGNITE-19502 - remove after switching to the Catalog.
+public class TableTestUtils {
+    /**
+     * Creates table in the catalog.
+     *
+     * @param catalogManager Catalog manager.
+     * @param schemaName Schema name.
+     * @param zoneName Zone name.
+     * @param tableName Table name.
+     * @param columns Table columns.
+     * @param pkColumns Primary key columns.
+     */
+    public static void createTable(
+            CatalogManager catalogManager,
+            String schemaName,
+            String zoneName,
+            String tableName,
+            List<ColumnParams> columns,
+            List<String> pkColumns
+    ) {
+        CatalogCommand command = CreateTableCommand.builder()
+                .schemaName(schemaName)
+                .zone(zoneName)
+                .tableName(tableName)
+                .columns(columns)
+                .primaryKeyColumns(pkColumns)
+                .build();
+
+        assertThat(catalogManager.execute(command), willCompleteSuccessfully());
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
new file mode 100644
index 0000000000..ac82e5d461
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about basic Schema Synchronization properties that can be tested using just one Ignite node.
+ */
+class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+    private static final String UNRELATED_TABLE_NAME = "unrelated_table";
+
+    private static final int KEY = 1;
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure that the following sequence results in an operation failure and transaction rollback.
+     *
+     * <ol>
+     *     <li>A table is created</li>
+     *     <li>A transaction is started</li>
+     *     <li>The table is enlisted in the transaction</li>
+     *     <li>The table is ALTERed</li>
+     *     <li>An attempt to read or write to the table in the transaction is made</li>
+     * </ol>
+     */
+    @ParameterizedTest
+    @EnumSource(Operation.class)
+    void readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation operation) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+        });
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        putPreExistingValueTo(table);
+
+        InternalTransaction tx = (InternalTransaction) node.transactions().begin();
+
+        enlistTableInTransaction(table, tx);
+
+        alterTable(TABLE_NAME);
+
+        IgniteException ex;
+
+        if (operation.sql()) {
+            ex = assertThrows(IgniteException.class, () -> operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    containsString("Table schema was updated after the transaction was started [table=1, startSchema=1, operationSchema=2")
+            );
+        } else {
+            ex = assertThrows(IncompatibleSchemaException.class, () -> operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    is("Table schema was updated after the transaction was started [table=1, startSchema=1, operationSchema=2]")
+            );
+        }
+
+        assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+
+        // TODO: IGNITE-20342 - Assert for SQL too.
+        if (!operation.sql()) {
+            assertThat(tx.state(), is(TxState.ABORTED));
+        }
+    }
+
+    private void alterTable(String tableName) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", session);
+        });
+    }
+
+    private static void putPreExistingValueTo(Table table) {
+        table.keyValueView().put(null, Tuple.create().set("id", KEY), Tuple.create().set("val", "original"));
+    }
+
+    private void enlistTableInTransaction(Table table, Transaction tx) {
+        executeReadOn(table, tx, cluster);
+    }
+
+    private static void executeReadOn(Table table, Transaction tx, Cluster cluster) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("SELECT * FROM " + table.name(), session, tx);
+        });
+    }
+
+    private enum Operation {
+        KV_WRITE {
+            @Override
+            void execute(Table table, Transaction tx, Cluster cluster) {
+                putInTx(table, tx);
+            }
+
+            @Override
+            boolean sql() {
+                return false;
+            }
+        },
+        KV_READ {
+            @Override
+            void execute(Table table, Transaction tx, Cluster cluster) {
+                table.keyValueView().get(tx, Tuple.create().set("id", KEY));
+            }
+
+            @Override
+            boolean sql() {
+                return false;
+            }
+        },
+        SQL_WRITE {
+            @Override
+            void execute(Table table, Transaction tx, Cluster cluster) {
+                cluster.doInSession(0, session -> {
+                    executeUpdate("UPDATE " + table.name() + " SET val = 'new value' WHERE id = " + KEY, session, tx);
+                });
+            }
+
+            @Override
+            boolean sql() {
+                return true;
+            }
+        },
+        SQL_READ {
+            @Override
+            void execute(Table table, Transaction tx, Cluster cluster) {
+                executeReadOn(table, tx, cluster);
+            }
+
+            @Override
+            boolean sql() {
+                return true;
+            }
+        };
+
+        abstract void execute(Table table, Transaction tx, Cluster cluster);
+
+        abstract boolean sql();
+    }
+
+    private static void putInTx(Table table, Transaction tx) {
+        table.keyValueView().put(tx, Tuple.create().set("id", 1), Tuple.create().set("val", "one"));
+    }
+
+    /**
+     * Makes sure that ALTERation of a table does not affect a transaction in which it's not enlisted.
+     */
+    @Test
+    void readWriteOperationInTxAfterAlteringSchemaOnAnotherTableIsUnaffected() {
+        cluster.doInSession(0, session -> {
+            executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+            executeUpdate("CREATE TABLE " + UNRELATED_TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+        });
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        InternalTransaction tx = (InternalTransaction) node.transactions().begin();
+
+        enlistTableInTransaction(table, tx);
+
+        alterTable(UNRELATED_TABLE_NAME);
+
+        assertDoesNotThrow(() -> putInTx(table, tx));
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 53bff13425..aec8028f5b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -17,9 +17,7 @@
 
 package org.apache.ignite.internal.sql.api;
 
-import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
@@ -34,16 +32,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType.TemporalColumnType;
-import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
 import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
@@ -111,37 +103,27 @@ public class ItCommonApiTest extends ClusterPerClassIntegrationTest {
     @Test
     public void checkTimestampOperations() {
         String kvTblName = "tbl_all_columns_sql";
-        String schemaName = "PUBLIC";
-        String keyCol = "key";
-        int maxTimePrecision = TemporalColumnType.MAX_TIME_PRECISION;
+        String keyCol = "KEY";
 
         Ignite node = CLUSTER_NODES.get(0);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19162 Trim all less than millisecond information from timestamp
         //String tsStr = "2023-03-29T08:22:33.005007Z";
-        String tsStr = "2023-03-29T08:22:33.005Z";
+        // TODO: IGNITE-20105 it should be "2023-03-29T08:22:33.005Z";
+        String tsStr = "2023-03-29T08:22:33Z";
 
         Instant ins = Instant.parse(tsStr);
 
         sql("CREATE TABLE timestamps(id INTEGER PRIMARY KEY, i TIMESTAMP(9))");
 
-        TableDefinition schTblAllSql = SchemaBuilders.tableBuilder(schemaName, kvTblName).columns(
-                SchemaBuilders.column(keyCol, ColumnType.INT64).build(),
-                SchemaBuilders.column("time", ColumnType.time(maxTimePrecision)).asNullable(true).build(),
-                SchemaBuilders.column("timestamp", ColumnType.timestamp(maxTimePrecision)).asNullable(true).build(),
-                SchemaBuilders.column("datetime", ColumnType.datetime(maxTimePrecision)).asNullable(true).build()
-        ).withPrimaryKey(keyCol).build();
-
-        await(((TableManager) node.tables()).createTableAsync(schTblAllSql.name(), DEFAULT_ZONE_NAME, tblCh ->
-                SchemaConfigurationConverter.convert(schTblAllSql, tblCh)
-        ));
+        // TODO: IGNITE-19274 Add column with TIMESTAMP WITH LOCAL TIME ZONE
+        sql(String.format("CREATE TABLE %s(\"%s\" INTEGER PRIMARY KEY, \"TIMESTAMP\" TIMESTAMP(9))", kvTblName, keyCol));
 
         Table tbl = node.tables().table(kvTblName);
 
         Tuple rec = Tuple.create()
-                .set("KEY", 1L)
-                .set("TIMESTAMP", ins)
-                .set("DATETIME", LocalDateTime.of(2023, 1, 18, 18, 9, 29));
+                .set("KEY", 1)
+                .set("TIMESTAMP", LocalDateTime.of(2023, 3, 29, 8, 22, 33));
 
         tbl.recordView().insert(null, rec);
 
@@ -156,13 +138,15 @@ public class ItCommonApiTest extends ClusterPerClassIntegrationTest {
 
             String srtRepr = ins.toString();
 
-            assertEquals(srtRepr.substring(0, srtRepr.length() - 1), res.next().datetimeValue(0).toString());
+            String expDateTimeStr = srtRepr.substring(0, srtRepr.length() - 1);
+
+            assertEquals(expDateTimeStr, res.next().datetimeValue(0).toString());
 
-            String query = "select \"KEY\", \"TIME\", \"DATETIME\", \"TIMESTAMP\" from TBL_ALL_COLUMNS_SQL ORDER BY KEY";
+            String query = "select \"KEY\", \"TIMESTAMP\" from TBL_ALL_COLUMNS_SQL ORDER BY KEY";
 
             res = ses.execute(null, query);
 
-            assertEquals(ins, res.next().timestampValue(3));
+            assertEquals(expDateTimeStr, res.next().datetimeValue(1).toString());
         }
     }
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
index 8b89bf2a37..0bc84b1bb0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -18,16 +18,14 @@
 package org.apache.ignite.internal.table;
 
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_REPLICA_COUNT;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
-import static org.apache.ignite.internal.runner.app.ItTablesApiTest.SCHEMA;
 import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
-import static org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.convert;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -43,7 +41,6 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -53,10 +50,6 @@ import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnDefinition;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
-import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -64,6 +57,7 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -83,7 +77,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 public class ItRoReadsTest extends BaseIgniteAbstractTest {
     private static final IgniteLogger LOG = Loggers.forClass(ItRoReadsTest.class);
 
-    private static final String TABLE_NAME = "some-table";
+    private static final String TABLE_NAME = "SOME_TABLE";
 
     private static final SchemaDescriptor SCHEMA_1 = new SchemaDescriptor(
             1,
@@ -509,28 +503,36 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
     }
 
     private static Table startTable(Ignite node, String tableName) {
-        List<ColumnDefinition> cols = new ArrayList<>();
-        cols.add(SchemaBuilders.column("key", ColumnType.INT64).build());
-        cols.add(SchemaBuilders.column("valInt", ColumnType.INT32).asNullable(true).build());
-        cols.add(SchemaBuilders.column("valStr", ColumnType.string()).withDefaultValue("default").build());
+        String zoneName = zoneNameForTable(tableName);
+
+        try (Session session = node.sql().createSession()) {
+            session.execute(null, String.format("create zone \"%s\" with partitions=1, replicas=%d", zoneName, DEFAULT_REPLICA_COUNT));
+
+            session.execute(null,
+                    String.format(
+                            "create table \"%s\" (key bigint primary key, valInt int, valStr varchar default 'default') "
+                                    + "with primary_zone='%s'",
+                            tableName, zoneName
+                    )
+            );
+        }
 
-        String zoneName = "zone_" + tableName;
-        createZone(((IgniteImpl) node).distributionZoneManager(), zoneName, 1, DEFAULT_REPLICA_COUNT);
+        Table table = node.tables().table(tableName);
 
-        CompletableFuture<Table> createTableCompletableFuture = ((TableManager) node.tables()).createTableAsync(
-                tableName,
-                zoneName,
-                tblCh -> convert(SchemaBuilders.tableBuilder(SCHEMA, tableName).columns(cols).withPrimaryKey("key").build(), tblCh)
-        );
+        assertNotNull(table);
 
-        assertThat(createTableCompletableFuture, willCompleteSuccessfully());
+        return table;
+    }
 
-        return createTableCompletableFuture.join();
+    private static String zoneNameForTable(String tableName) {
+        return "ZONE_" + tableName;
     }
 
     private static void stopTable(Ignite node, String tableName) {
-        assertThat(((TableManager) node.tables()).dropTableAsync(tableName), willCompleteSuccessfully());
-        DistributionZonesTestUtil.dropZone(((IgniteImpl) node).distributionZoneManager(), "zone_" + tableName);
+        try (Session session = node.sql().createSession()) {
+            session.execute(null, "drop table " + tableName);
+            session.execute(null, "drop zone " + zoneNameForTable(tableName));
+        }
     }
 
     protected static int nodes() {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 37c9062aca..990284ff99 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1132,4 +1132,14 @@ public class IgniteImpl implements Ignite {
     public PlacementDriver placementDriver() {
         return placementDriverMgr.placementDriver();
     }
+
+    /**
+     * Returns the CatalogManager.
+     *
+     * @return Catalog manager.
+     */
+    @TestOnly
+    public CatalogManager catalogManager() {
+        return catalogManager;
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdRegistry.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdRegistry.java
new file mode 100644
index 0000000000..80b6c1e903
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Registry for configTableId->catalogTableId pairs.
+ */
+// TODO: IGNITE-20386 - remove after the switch to the Catalog.
+public class TableIdRegistry implements TableIdTranslator {
+    private final Map<Integer, Integer> configToCatalogIds = new ConcurrentHashMap<>();
+
+    @Override
+    public int configIdToCatalogId(int configTableId) {
+        Integer catalogId = configToCatalogIds.get(configTableId);
+
+        assert catalogId != null : "Translating " + configTableId + ", but nothing was found";
+
+        return catalogId;
+    }
+
+    /**
+     * Registers a mapping of one table ID to another (both belong to the same table).
+     *
+     * @param configTableId Config-defined table ID.
+     * @param catalogTableId Catalog-defined table ID.
+     */
+    public void registerMapping(int configTableId, int catalogTableId) {
+        configToCatalogIds.put(configTableId, catalogTableId);
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdTranslator.java
similarity index 58%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdTranslator.java
index 6baae3a1c5..fa5eacf80c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdTranslator.java
@@ -15,47 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.replicator.action;
+package org.apache.ignite.internal.table.distributed;
 
 /**
- * Transaction operation type.
+ * Translates configuration-defined table IDs to Catalog-defined table IDs.
  */
-public enum RequestType {
-    RW_GET,
-
-    RW_GET_ALL,
-
-    RW_DELETE,
-
-    RW_DELETE_ALL,
-
-    RW_DELETE_EXACT,
-
-    RW_DELETE_EXACT_ALL,
-
-    RW_INSERT,
-
-    RW_INSERT_ALL,
-
-    RW_UPSERT,
-
-    RW_UPSERT_ALL,
-
-    RW_REPLACE,
-
-    RW_REPLACE_IF_EXIST,
-
-    RW_GET_AND_DELETE,
-
-    RW_GET_AND_REPLACE,
-
-    RW_GET_AND_UPSERT,
-
-    RW_SCAN,
-
-    RO_GET,
-
-    RO_GET_ALL,
-
-    RO_SCAN
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+// TODO: IGNITE-20386 - remove after the switch to the Catalog.
+public interface TableIdTranslator {
+    /**
+     * Translates a configuration-defined table ID to a Catalog-defined table IDs.
+     *
+     * @param configTableId Configuration-defined table ID to translate.
+     * @return Catalog-defined table ID corresponding to the argument.
+     */
+    int configIdToCatalogId(int configTableId);
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b63129b35f..1bbb0b1e45 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -93,6 +93,8 @@ import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogDataStorageDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -165,6 +167,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotAwarePartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTablesWithIdConversion;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
@@ -395,6 +398,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Placement driver. */
     private final PlacementDriver placementDriver;
 
+    private final TableIdRegistry tableIdTranslator = new TableIdRegistry();
+
     /**
      * Creates a new table manager.
      *
@@ -582,6 +587,32 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         });
 
         addMessageHandler(clusterService.messagingService());
+
+        // TODO: IGNITE-19499 - remove when switched to the Catalog.
+        recoverTableIdsMapping();
+
+        // TODO: IGNITE-19499 - remove when switched to the Catalog.
+        catalogService.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
+            CreateTableEventParameters event = (CreateTableEventParameters) parameters;
+
+            TableView tableConfig = tablesCfg.tables().value().get(event.tableDescriptor().name());
+
+            assert tableConfig != null : "No table config found by name " + event.tableDescriptor().name();
+
+            tableIdTranslator.registerMapping(tableConfig.id(), event.tableId());
+
+            return completedFuture(false);
+        });
+    }
+
+    private void recoverTableIdsMapping() {
+        tablesCfg.tables().value().forEach(tableView -> {
+            CatalogTableDescriptor tableDescriptor = catalogService.table(tableView.name(), Long.MAX_VALUE);
+
+            assert tableDescriptor != null : "No table in the Catalog with name " + tableView.name();
+
+            tableIdTranslator.registerMapping(tableView.id(), tableDescriptor.id());
+        });
     }
 
     private CompletableFuture<Void> performRebalanceOnRecovery(long revision) {
@@ -1056,6 +1087,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 indexBuilder,
                 schemaSyncService,
                 catalogService,
+                // TODO: IGNITE-19499 - replace with DirectCatalogTables
+                new CatalogTablesWithIdConversion(catalogService, tableIdTranslator),
                 tablesCfg,
                 placementDriver
         );
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTables.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTables.java
new file mode 100644
index 0000000000..d68c36b0d9
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTables.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This interface is needed to cope with the fact and for some time we have both configuration-defined and Catalog-defined
+ * table IDs (which are usually not equal to each other even for the same table); it allows to hide the
+ * translation logic as most of the code operates with config-defined table IDs, while {@link CatalogService} requires
+ * its own table IDs.
+ */
+// TODO: IGNITE-20386 - remove after the switch to the Catalog.
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface CatalogTables {
+    /**
+     * Gets a table by a timestamp. Make sure there is enough metadata (see {@link SchemaSyncService}) before invoking this method.
+     *
+     * @param tableId ID of the table (before the switch to the Catalog, these are config-defined IDs).
+     * @param timestamp Timestamp at which to look for a table.
+     * @return Table descriptor or {@code null} if none was found.
+     */
+    @Nullable CatalogTableDescriptor table(int tableId, long timestamp);
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTablesWithIdConversion.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTablesWithIdConversion.java
new file mode 100644
index 0000000000..caadf73246
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTablesWithIdConversion.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.table.distributed.TableIdTranslator;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation which accepts configuration-defined table IDs, converts them under the hood to Catalog-defined ones
+ * and uses them to access table information in the {@link CatalogService}.
+ */
+// TODO: IGNITE-20386 - replace all usages with DirectCatalogTables and remove after the switch to the Catalog.
+public class CatalogTablesWithIdConversion implements CatalogTables {
+    private final CatalogService catalogService;
+
+    private final TableIdTranslator tableIdTranslator;
+
+    /** Constructor. */
+    public CatalogTablesWithIdConversion(CatalogService catalogService, TableIdTranslator tableIdTranslator) {
+        this.catalogService = catalogService;
+        this.tableIdTranslator = tableIdTranslator;
+    }
+
+    @Override
+    public @Nullable CatalogTableDescriptor table(int tableId, long timestamp) {
+        int catalogTableId = tableIdTranslator.configIdToCatalogId(tableId);
+
+        return catalogService.table(catalogTableId, timestamp);
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DirectCatalogTables.java
similarity index 50%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DirectCatalogTables.java
index 6baae3a1c5..20b73d7977 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DirectCatalogTables.java
@@ -15,47 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.replicator.action;
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Transaction operation type.
+ * Does not perform any table ID translation and should be used after we switch to the Catalog completely
+ * (when there are only Catalog-defined table IDs, no configuration-defined IDs).
  */
-public enum RequestType {
-    RW_GET,
-
-    RW_GET_ALL,
-
-    RW_DELETE,
-
-    RW_DELETE_ALL,
-
-    RW_DELETE_EXACT,
-
-    RW_DELETE_EXACT_ALL,
-
-    RW_INSERT,
-
-    RW_INSERT_ALL,
-
-    RW_UPSERT,
-
-    RW_UPSERT_ALL,
-
-    RW_REPLACE,
-
-    RW_REPLACE_IF_EXIST,
-
-    RW_GET_AND_DELETE,
-
-    RW_GET_AND_REPLACE,
-
-    RW_GET_AND_UPSERT,
-
-    RW_SCAN,
-
-    RO_GET,
+public class DirectCatalogTables implements CatalogTables {
+    private final CatalogService catalogService;
 
-    RO_GET_ALL,
+    public DirectCatalogTables(CatalogService catalogService) {
+        this.catalogService = catalogService;
+    }
 
-    RO_SCAN
+    @Override
+    public @Nullable CatalogTableDescriptor table(int tableId, long timestamp) {
+        return catalogService.table(tableId, timestamp);
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 2ac0c90e26..7bb2587837 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -308,6 +308,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             IndexBuilder indexBuilder,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
+            CatalogTables catalogTables,
             TablesConfiguration tablesConfig,
             PlacementDriver placementDriver
     ) {
@@ -336,7 +337,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         cursors = new ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
 
-        schemaCompatValidator = new SchemaCompatValidator(schemas);
+        schemaCompatValidator = new SchemaCompatValidator(schemas, catalogTables);
 
         TableView tableConfig = findTableView(tablesConfig.tables().value(), tableId);
 
@@ -389,6 +390,14 @@ public class PartitionReplicaListener implements ReplicaListener {
 
             // Implicit RW scan can be committed locally on a last batch or error.
             return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, false, () -> processScanRetrieveBatchAction(req, senderId))
+                    .thenCompose(rows -> {
+                        if (allElementsAreNull(rows)) {
+                            return completedFuture(rows);
+                        } else {
+                            return validateAtTimestamp(req.transactionId())
+                                    .thenApply(ignored -> rows);
+                        }
+                    })
                     .handle((rows, err) -> {
                         if (req.full() && (err != null || rows.size() < req.batchSize())) {
                             releaseTxLocks(req.transactionId());
@@ -784,8 +793,8 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.S).thenCompose(tblLock ->
-                retrieveExactEntriesUntilCursorEmpty(txId, cursorId, batchCount));
+        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.S)
+                .thenCompose(tblLock -> retrieveExactEntriesUntilCursorEmpty(txId, cursorId, batchCount));
     }
 
     /**
@@ -1245,7 +1254,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         HybridTimestamp currentTimestamp = hybridClock.now();
         HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
 
-        return catalogVersionFor(currentTimestamp)
+        return reliableCatalogVersionFor(currentTimestamp)
                 .thenApply(catalogVersion -> {
                     FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
                             .txId(txId)
@@ -1313,7 +1322,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             }
 
             txOps.futures.forEach((opType, futures) -> {
-                if (opType == RequestType.RW_GET || opType == RequestType.RW_GET_ALL || opType == RequestType.RW_SCAN) {
+                if (opType.isRwRead()) {
                     txReadFutures.addAll(futures);
                 } else {
                     txUpdateFutures.addAll(futures);
@@ -1339,7 +1348,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         return allOffFuturesExceptionIgnored(txUpdateFutures, request).thenCompose(v -> {
             long commandTimestamp = hybridClock.nowLong();
 
-            return catalogVersionFor(hybridTimestamp(commandTimestamp))
+            return reliableCatalogVersionFor(hybridTimestamp(commandTimestamp))
                     .thenCompose(catalogVersion -> {
                         TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
                                 .txId(request.txId())
@@ -1660,9 +1669,9 @@ public class PartitionReplicaListener implements ReplicaListener {
                         return completedFuture(result);
                     }
 
-                    return updateAllCommand(request, rowIdsToDelete, txCoordinatorId)
-                                .thenCompose(this::applyUpdateAllCommand)
-                                .thenApply(v -> result);
+                    return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, txCoordinatorId)
+                            .thenCompose(this::applyUpdateAllCommand)
+                            .thenApply(ignored -> result);
                 });
             }
             case RW_INSERT_ALL: {
@@ -1716,7 +1725,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                             ));
 
                     return allOf(insertLockFuts)
-                            .thenCompose(ignored -> updateAllCommand(request, convertedMap, txCoordinatorId))
+                            .thenCompose(ignored -> validateAtTimestampAndBuildUpdateAllCommand(request, convertedMap, txCoordinatorId))
                             .thenCompose(this::applyUpdateAllCommand)
                             .thenApply(ignored -> {
                                 // Release short term locks.
@@ -1760,7 +1769,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         return completedFuture(null);
                     }
 
-                    return updateAllCommand(request, rowsToUpdate, txCoordinatorId)
+                    return validateAtTimestampAndBuildUpdateAllCommand(request, rowsToUpdate, txCoordinatorId)
                             .thenCompose(this::applyUpdateAllCommand)
                             .thenRun(() -> {
                                 // Release short term locks.
@@ -1785,7 +1794,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param txCoordinatorId Transaction coordinator id.
      * @return Listener response.
      */
-    private CompletableFuture<Object> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, String txCoordinatorId) {
+    private CompletableFuture<?> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, String txCoordinatorId) {
         UUID txId = request.transactionId();
         TablePartitionId committedPartitionId = request.commitPartitionId().asTablePartitionId();
         List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
@@ -1809,14 +1818,19 @@ public class PartitionReplicaListener implements ReplicaListener {
                 }
 
                 return allOf(rowFuts)
-                        .thenApply(ignored -> {
+                        .thenCompose(ignored -> {
                             var result = new ArrayList<BinaryRow>(primaryKeys.size());
 
                             for (CompletableFuture<BinaryRow> rowFut : rowFuts) {
                                 result.add(rowFut.join());
                             }
 
-                            return result;
+                            if (allElementsAreNull(result)) {
+                                return completedFuture(result);
+                            }
+
+                            return validateAtTimestamp(txId)
+                                    .thenApply(unused -> result);
                         });
             }
             case RW_DELETE_ALL: {
@@ -1852,7 +1866,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         return completedFuture(result);
                     }
 
-                    return updateAllCommand(request, rowIdsToDelete, txCoordinatorId)
+                    return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, txCoordinatorId)
                             .thenCompose(this::applyUpdateAllCommand)
                             .thenApply(ignored -> result);
                 });
@@ -1864,6 +1878,16 @@ public class PartitionReplicaListener implements ReplicaListener {
         }
     }
 
+    private static <T> boolean allElementsAreNull(List<T> list) {
+        for (T element : list) {
+            if (element != null) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     /**
      * Executes a command and handles exceptions. A result future can be finished with exception by following rules:
      * <ul>
@@ -2001,9 +2025,9 @@ public class PartitionReplicaListener implements ReplicaListener {
                                     return completedFuture(false);
                                 }
 
-                                return updateCommand(request, validatedRowId.uuid(), null, txCoordinatorId)
+                                return validateAtTimestampAndBuildUpdateCommand(request, validatedRowId.uuid(), null, txCoordinatorId)
                                         .thenCompose(this::applyUpdateCommand)
-                                        .thenApply(ignored -> true);
+                                        .thenApply(ignored -> (Boolean) true);
                             });
                 });
             }
@@ -2016,13 +2040,15 @@ public class PartitionReplicaListener implements ReplicaListener {
                     RowId rowId0 = new RowId(partId(), UUID.randomUUID());
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
-                            .thenCompose(rowIdLock -> updateCommand(request, rowId0.uuid(), searchRow, txCoordinatorId)
+                            .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
+                                        txCoordinatorId
+                                    )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(v -> {
                                         // Release short term locks.
                                         rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
-                                        return true;
+                                        return (Boolean) true;
                                     }));
                 });
             }
@@ -2037,12 +2063,17 @@ public class PartitionReplicaListener implements ReplicaListener {
                             : takeLocksForUpdate(searchRow, rowId0, txId);
 
                     return lockFut
-                            .thenCompose(rowIdLock -> updateCommand(request, rowId0.uuid(), searchRow, txCoordinatorId)
+                            .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
+                                        txCoordinatorId
+                                    )
                                     .thenCompose(this::applyUpdateCommand)
-                                    .thenRun(() -> {
+                                    .thenApply(ignored -> rowIdLock))
+                                    .thenApply(rowIdLock -> {
                                         // Release short term locks.
                                         rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
-                                    }));
+
+                                        return null;
+                                    });
                 });
             }
             case RW_GET_AND_UPSERT: {
@@ -2056,7 +2087,9 @@ public class PartitionReplicaListener implements ReplicaListener {
                             : takeLocksForUpdate(searchRow, rowId0, txId);
 
                     return lockFut
-                            .thenCompose(rowIdLock -> updateCommand(request, rowId0.uuid(), searchRow, txCoordinatorId)
+                            .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
+                                        txCoordinatorId
+                                    )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(v -> {
                                         // Release short term locks.
@@ -2073,7 +2106,9 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
-                            .thenCompose(rowIdLock -> updateCommand(request, rowId.uuid(), searchRow, txCoordinatorId)
+                            .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow,
+                                        txCoordinatorId
+                                    )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(v -> {
                                         // Release short term locks.
@@ -2090,13 +2125,15 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
-                            .thenCompose(rowIdLock -> updateCommand(request, rowId.uuid(), searchRow, txCoordinatorId)
+                            .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow,
+                                        txCoordinatorId
+                                    )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(v -> {
                                         // Release short term locks.
                                         rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
-                                        return true;
+                                        return (Boolean) true;
                                     }));
                 });
             }
@@ -2130,6 +2167,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForGet(rowId, txId)
+                            .thenCompose(ignored -> validateAtTimestamp(txId))
                             .thenApply(ignored -> row);
                 });
             }
@@ -2140,9 +2178,9 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(ignored -> updateCommand(request, rowId.uuid(), null, txCoordinatorId))
+                            .thenCompose(rowLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, txCoordinatorId))
                             .thenCompose(this::applyUpdateCommand)
-                            .thenApply(ignored -> true);
+                            .thenApply(ignored -> (Boolean) true);
                 });
             }
             case RW_GET_AND_DELETE: {
@@ -2152,7 +2190,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(ignored -> updateCommand(request, rowId.uuid(), null, txCoordinatorId))
+                            .thenCompose(ignored -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, txCoordinatorId))
                             .thenCompose(this::applyUpdateCommand)
                             .thenApply(ignored -> row);
                 });
@@ -2310,7 +2348,10 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param txCoordinatorId Transaction coordinator id.
      * @return Listener response.
      */
-    private CompletableFuture<Boolean> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, String txCoordinatorId) {
+    private CompletableFuture<Boolean> processTwoEntriesAction(
+            ReadWriteSwapRowReplicaRequest request,
+            String txCoordinatorId
+    ) {
         BinaryRow newRow = request.binaryRow();
         BinaryRow expectedRow = request.oldBinaryRow();
         TablePartitionIdMessage commitPartitionId = request.commitPartitionId();
@@ -2331,8 +2372,8 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 return completedFuture(false);
                             }
 
-                            return updateCommand(commitPartitionId, validatedRowId.get1().uuid(), newRow, txId, request.full(),
-                                        txCoordinatorId
+                            return validateAtTimestampAndBuildUpdateCommand(commitPartitionId.asTablePartitionId(),
+                                        validatedRowId.get1().uuid(), newRow, txId, request.full(), txCoordinatorId
                                     )
                                     .thenCompose(this::applyUpdateCommand)
                                     .thenApply(ignored -> validatedRowId)
@@ -2340,7 +2381,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                         // Release short term locks.
                                         rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
-                                        return true;
+                                        return (Boolean) true;
                                     });
                         });
             });
@@ -2524,120 +2565,213 @@ public class PartitionReplicaListener implements ReplicaListener {
                 });
     }
 
-    private CompletableFuture<UpdateCommand> updateCommand(
+    private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
+        HybridTimestamp operationTimestamp = hybridClock.now();
+
+        return schemaSyncService.waitForMetadataCompleteness(operationTimestamp)
+                .thenApply(unused -> {
+                    failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
+
+                    return null;
+                });
+    }
+
+    /**
+     * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+     *
+     * @param request Request that is being processed.
+     * @param rowUuid Row UUID.
+     * @param row Row.
+     * @param txCoordinatorId Transaction coordinator id.
+     * @return Future that will complete with the constructed {@link UpdateCommand} object.
+     */
+    private CompletableFuture<UpdateCommand> validateAtTimestampAndBuildUpdateCommand(
             ReadWriteSingleRowReplicaRequest request,
             UUID rowUuid,
             @Nullable BinaryRow row,
             String txCoordinatorId
     ) {
-        return updateCommand(request.commitPartitionId(), rowUuid, row, request.transactionId(), request.full(), txCoordinatorId);
+        return validateAtTimestampAndBuildUpdateCommand(
+                request.commitPartitionId().asTablePartitionId(),
+                rowUuid,
+                row,
+                request.transactionId(),
+                request.full(),
+                txCoordinatorId
+        );
     }
 
-    private CompletableFuture<UpdateCommand> updateCommand(
+    /**
+     * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+     *
+     * @param request Request that is being processed.
+     * @param rowUuid Row UUID.
+     * @param row Row.
+     * @param txCoordinatorId Transaction coordinator id.
+     * @return Future that will complete with the constructed {@link UpdateCommand} object.
+     */
+    private CompletableFuture<UpdateCommand> validateAtTimestampAndBuildUpdateCommand(
             ReadWriteSingleRowPkReplicaRequest request,
             UUID rowUuid,
             @Nullable BinaryRow row,
             String txCoordinatorId
     ) {
-        return updateCommand(request.commitPartitionId(), rowUuid, row, request.transactionId(), request.full(), txCoordinatorId);
+        return validateAtTimestampAndBuildUpdateCommand(
+                request.commitPartitionId().asTablePartitionId(),
+                rowUuid,
+                row,
+                request.transactionId(),
+                request.full(),
+                txCoordinatorId
+        );
     }
 
     /**
-     * Method to construct {@link UpdateCommand} object.
+     * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
      *
-     * @param tablePartId {@link TablePartitionId} object to construct {@link UpdateCommand} object with.
+     * @param tablePartId {@link TablePartitionId} object.
      * @param rowUuid Row UUID.
      * @param row Row.
      * @param txId Transaction ID.
      * @param full {@code True} if this is a full transaction.
      * @param txCoordinatorId Transaction coordinator id.
-     * @return Constructed {@link UpdateCommand} object.
+     * @return Future that will complete with the constructed {@link UpdateCommand} object.
      */
-    private CompletableFuture<UpdateCommand> updateCommand(
-            TablePartitionIdMessage tablePartId,
+    private CompletableFuture<UpdateCommand> validateAtTimestampAndBuildUpdateCommand(
+            TablePartitionId tablePartId,
             UUID rowUuid,
             @Nullable BinaryRow row,
             UUID txId,
             boolean full,
             String txCoordinatorId
     ) {
-        long commandTimestamp = hybridClock.nowLong();
+        HybridTimestamp operationTimestamp = hybridClock.now();
 
-        return catalogVersionFor(hybridTimestamp(commandTimestamp))
+        return reliableCatalogVersionFor(operationTimestamp)
                 .thenApply(catalogVersion -> {
-                    UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
-                            .tablePartitionId(tablePartId)
-                            .rowUuid(rowUuid)
-                            .txId(txId)
-                            .full(full)
-                            .safeTimeLong(commandTimestamp)
-                            .txCoordinatorId(txCoordinatorId)
-                            .requiredCatalogVersion(catalogVersion);
+                    failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
 
-                    if (row != null) {
-                        BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
-                                .binaryTuple(row.tupleSlice())
-                                .schemaVersion(row.schemaVersion())
-                                .build();
+                    return updateCommand(tablePartId, rowUuid, row, txId, full, txCoordinatorId, operationTimestamp, catalogVersion);
+                });
+    }
 
-                        bldr.rowMessage(rowMessage);
-                    }
+    private static UpdateCommand updateCommand(
+            TablePartitionId tablePartId,
+            UUID rowUuid,
+            @Nullable BinaryRow row,
+            UUID txId,
+            boolean full,
+            String txCoordinatorId,
+            HybridTimestamp operationTimestamp,
+            int catalogVersion
+    ) {
+        UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
+                .tablePartitionId(tablePartitionId(tablePartId))
+                .rowUuid(rowUuid)
+                .txId(txId)
+                .full(full)
+                .safeTimeLong(operationTimestamp.longValue())
+                .txCoordinatorId(txCoordinatorId)
+                .requiredCatalogVersion(catalogVersion);
+
+        if (row != null) {
+            BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
+                    .binaryTuple(row.tupleSlice())
+                    .schemaVersion(row.schemaVersion())
+                    .build();
+
+            bldr.rowMessage(rowMessage);
+        }
 
-                    return bldr.build();
-                });
+        return bldr.build();
     }
 
-    private CompletableFuture<Integer> catalogVersionFor(HybridTimestamp ts) {
+    private void failIfSchemaChangedSinceTxStart(UUID txId, HybridTimestamp operationTimestamp) {
+        schemaCompatValidator.failIfSchemaChangedAfterTxStart(txId, operationTimestamp, tableId());
+    }
+
+    private CompletableFuture<Integer> reliableCatalogVersionFor(HybridTimestamp ts) {
         return schemaSyncService.waitForMetadataCompleteness(ts)
                 .thenApply(unused -> catalogService.activeCatalogVersion(ts.longValue()));
     }
 
-    private CompletableFuture<UpdateAllCommand> updateAllCommand(
+    /**
+     * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+     *
+     * @param request Request that is being processed.
+     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteBuffer}s to be updated.
+     * @param txCoordinatorId Transaction coordinator id.
+     * @return Future that will complete with the constructed {@link UpdateAllCommand} object.
+     */
+    private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand(
             ReadWriteMultiRowReplicaRequest request,
             Map<UUID, BinaryRowMessage> rowsToUpdate,
             String txCoordinatorId
     ) {
-        return updateAllCommand(rowsToUpdate, request.commitPartitionId(), request.transactionId(), request.full(), txCoordinatorId);
+        return validateAtTimestampAndBuildUpdateAllCommand(
+                rowsToUpdate,
+                request.commitPartitionId(),
+                request.transactionId(),
+                request.full(),
+                txCoordinatorId
+        );
     }
 
-    private CompletableFuture<UpdateAllCommand> updateAllCommand(
+    /**
+     * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+     *
+     * @param request Request that is being processed.
+     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteBuffer}s to be updated.
+     * @param txCoordinatorId Transaction coordinator id.
+     * @return Future that will complete with the constructed {@link UpdateAllCommand} object.
+     */
+    private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand(
             ReadWriteMultiRowPkReplicaRequest request,
             Map<UUID, BinaryRowMessage> rowsToUpdate,
             String txCoordinatorId
     ) {
-        return updateAllCommand(rowsToUpdate, request.commitPartitionId(), request.transactionId(), request.full(), txCoordinatorId);
+        return validateAtTimestampAndBuildUpdateAllCommand(
+                rowsToUpdate,
+                request.commitPartitionId(),
+                request.transactionId(),
+                request.full(),
+                txCoordinatorId
+        );
     }
 
     /**
-     * Method to construct {@link UpdateAllCommand} object.
+     * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
      *
      * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteBuffer}s to be updated.
      * @param commitPartitionId Partition ID that these rows belong to.
      * @param transactionId Transaction ID.
      * @param full {@code true} if this is a single-command transaction.
      * @param txCoordinatorId Transaction coordinator id.
-     * @return Constructed {@link UpdateAllCommand} object.
+     * @return Future that will complete with the constructed {@link UpdateAllCommand} object.
      */
-    private CompletableFuture<UpdateAllCommand> updateAllCommand(
+    private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand(
             Map<UUID, BinaryRowMessage> rowsToUpdate,
             TablePartitionIdMessage commitPartitionId,
             UUID transactionId,
             boolean full,
             String txCoordinatorId
     ) {
-        long commandTimestamp = hybridClock.nowLong();
-
-        return catalogVersionFor(hybridTimestamp(commandTimestamp))
-                .thenApply(catalogVersion -> MSG_FACTORY.updateAllCommand()
-                        .tablePartitionId(commitPartitionId)
-                        .rowsToUpdate(rowsToUpdate)
-                        .txId(transactionId)
-                        .safeTimeLong(commandTimestamp)
-                        .full(full)
-                        .txCoordinatorId(txCoordinatorId)
-                        .requiredCatalogVersion(catalogVersion)
-                        .build()
-                );
+        HybridTimestamp operationTimestamp = hybridClock.now();
+
+        return reliableCatalogVersionFor(operationTimestamp)
+                .thenApply(catalogVersion -> {
+                    failIfSchemaChangedSinceTxStart(transactionId, operationTimestamp);
+
+                    return MSG_FACTORY.updateAllCommand()
+                            .tablePartitionId(commitPartitionId)
+                            .rowsToUpdate(rowsToUpdate)
+                            .txId(transactionId)
+                            .safeTimeLong(operationTimestamp.longValue())
+                            .full(full)
+                            .txCoordinatorId(txCoordinatorId)
+                            .requiredCatalogVersion(catalogVersion)
+                            .build();
+                });
     }
 
     /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 738c85f3f3..26ef6d8811 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
@@ -36,9 +37,12 @@ import org.jetbrains.annotations.Nullable;
  */
 class SchemaCompatValidator {
     private final Schemas schemas;
+    private final CatalogTables catalogTables;
 
-    SchemaCompatValidator(Schemas schemas) {
+    /** Constructor. */
+    SchemaCompatValidator(Schemas schemas, CatalogTables catalogTables) {
         this.schemas = schemas;
+        this.catalogTables = catalogTables;
     }
 
     /**
@@ -165,4 +169,22 @@ class SchemaCompatValidator {
         // TODO: IGNITE-19229 - is backward compatibility always symmetric with the forward compatibility?
         return isForwardCompatible(newSchema, oldSchema);
     }
+
+    void failIfSchemaChangedAfterTxStart(UUID txId, HybridTimestamp operationTimestamp, int tableId) {
+        HybridTimestamp beginTs = TransactionIds.beginTimestamp(txId);
+        CatalogTableDescriptor tableAtBeginTs = catalogTables.table(tableId, beginTs.longValue());
+        CatalogTableDescriptor tableAtOpTs = catalogTables.table(tableId, operationTimestamp.longValue());
+
+        assert tableAtBeginTs != null;
+        assert tableAtOpTs != null;
+
+        if (tableAtOpTs.tableVersion() != tableAtBeginTs.tableVersion()) {
+            throw new IncompatibleSchemaException(
+                    String.format(
+                            "Table schema was updated after the transaction was started [table=%d, startSchema=%d, operationSchema=%d]",
+                            tableId, tableAtBeginTs.tableVersion(), tableAtOpTs.tableVersion()
+                    )
+            );
+        }
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
index 6baae3a1c5..f5bc723121 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
@@ -57,5 +57,43 @@ public enum RequestType {
 
     RO_GET_ALL,
 
-    RO_SCAN
+    RO_SCAN;
+
+    /**
+     * Returns {@code true} if the operation is an RW read.
+     */
+    public boolean isRwRead() {
+        switch (this) {
+            case RW_GET:
+            case RW_GET_ALL:
+            case RW_SCAN:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the operation is a write.
+     */
+    public boolean isWrite() {
+        switch (this) {
+            case RW_DELETE:
+            case RW_DELETE_ALL:
+            case RW_DELETE_EXACT:
+            case RW_DELETE_EXACT_ALL:
+            case RW_INSERT:
+            case RW_INSERT_ALL:
+            case RW_UPSERT:
+            case RW_UPSERT_ALL:
+            case RW_REPLACE:
+            case RW_REPLACE_IF_EXIST:
+            case RW_GET_AND_DELETE:
+            case RW_GET_AND_REPLACE:
+            case RW_GET_AND_UPSERT:
+                return true;
+            default:
+                return false;
+        }
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d63a88a6fd..2aeb6e1fd4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -26,6 +26,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.hasItem;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -40,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -81,6 +84,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -194,6 +198,9 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
 
         TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE);
 
+        CatalogTables catalogTables = mock(CatalogTables.class);
+        when(catalogTables.table(anyInt(), anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
         ClusterNode localNode = mock(ClusterNode.class);
 
         partitionReplicaListener = new PartitionReplicaListener(
@@ -232,6 +239,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
                 mock(IndexBuilder.class),
                 mock(SchemaSyncService.class, invocation -> completedFuture(null)),
                 mock(CatalogService.class),
+                catalogTables,
                 tablesConfig,
                 new TestPlacementDriver(localNode.name())
         );
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dd69c9e333..460826eadb 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -39,13 +39,17 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.AdditionalMatchers.gt;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -77,6 +81,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.commands.DefaultValue;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -133,6 +138,7 @@ import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
 import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
 import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
 import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
@@ -147,6 +153,7 @@ import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
@@ -177,6 +184,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junitpioneer.jupiter.cartesian.ArgumentSets;
 import org.junitpioneer.jupiter.cartesian.CartesianTest;
 import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
 import org.mockito.ArgumentCaptor;
@@ -198,7 +206,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private static final int CURRENT_SCHEMA_VERSION = 1;
 
-    private static final int FUTURE_SCHEMA_VERSION = 2;
+    private static final int NEXT_SCHEMA_VERSION = 2;
+
+    private static final int FUTURE_SCHEMA_VERSION = NEXT_SCHEMA_VERSION;
 
     private static final int FUTURE_SCHEMA_ROW_INDEXED_VALUE = 0;
 
@@ -287,6 +297,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     @Mock
     private CatalogService catalogService;
 
+    @Mock
+    private CatalogTables catalogTables;
+
     /** Schema descriptor for tests. */
     private SchemaDescriptor schemaDescriptor;
 
@@ -299,6 +312,18 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     /** Key-value marshaller using schema version 2. */
     private KvMarshaller<TestKey, TestValue> kvMarshallerVersion2;
 
+    private final CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
+            tblId, 1, "table", 1, CURRENT_SCHEMA_VERSION,
+            List.of(
+                    new CatalogTableColumnDescriptor("intKey", ColumnType.INT32, false, 0, 0, 0, null),
+                    new CatalogTableColumnDescriptor("strKey", ColumnType.STRING, false, 0, 0, 0, null),
+                    new CatalogTableColumnDescriptor("intVal", ColumnType.INT32, false, 0, 0, 0, null),
+                    new CatalogTableColumnDescriptor("strVal", ColumnType.STRING, false, 0, 0, 0, null)
+            ),
+            List.of("intKey", "strKey"),
+            null
+    );
+
     /** Partition replication listener to test. */
     private PartitionReplicaListener partitionReplicaListener;
 
@@ -382,12 +407,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));
         when(schemas.waitForSchemaAvailability(anyInt(), anyInt())).thenReturn(completedFuture(null));
 
+        lenient().when(catalogTables.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+
         int pkIndexId = 1;
         int sortedIndexId = 2;
         int hashIndexId = 3;
 
         schemaDescriptor = schemaDescriptorWith(CURRENT_SCHEMA_VERSION);
-        schemaDescriptorVersion2 = schemaDescriptorWith(FUTURE_SCHEMA_VERSION);
+        schemaDescriptorVersion2 = schemaDescriptorWith(NEXT_SCHEMA_VERSION);
 
         ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);
 
@@ -459,6 +486,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 mock(IndexBuilder.class),
                 schemaSyncService,
                 catalogService,
+                catalogTables,
                 tablesConfig,
                 new TestPlacementDriver(localNode.name())
         );
@@ -505,7 +533,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(clock.nowLong())
-                .txId(TestTransactionIds.newTransactionId())
+                .txId(newTxId())
                 .build(), "senderId");
 
         LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
@@ -516,7 +544,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testTxStateReplicaRequestCommitState() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
 
         txStateStorage.put(txId, new TxMeta(TxState.COMMITED, singletonList(grpId), clock.now()));
 
@@ -546,7 +574,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
                 .groupId(grpId)
                 .readTimestampLong(clock.nowLong())
-                .txId(TestTransactionIds.newTransactionId())
+                .txId(newTxId())
                 .build(), localNode.id());
 
         LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
@@ -573,7 +601,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testReadOnlySingleRowReplicaRequestCommittedResult() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         BinaryRow testBinaryKey = nextBinaryKey();
         BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
         var rowId = new RowId(partId);
@@ -596,7 +624,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         BinaryRow testBinaryKey = nextBinaryKey();
         BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
         var rowId = new RowId(partId);
@@ -619,7 +647,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         BinaryRow testBinaryKey = nextBinaryKey();
         BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
         var rowId = new RowId(partId);
@@ -641,7 +669,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         BinaryRow testBinaryKey = nextBinaryKey();
         BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
         var rowId = new RowId(partId);
@@ -664,7 +692,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         int sortedIndexId = sortedIndexStorage.id();
 
         IntStream.range(0, 6).forEach(i -> {
@@ -681,7 +709,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             testMvPartitionStorage.commitWrite(rowId, clock.now());
         });
 
-        UUID scanTxId = TestTransactionIds.newTransactionId();
+        UUID scanTxId = newTxId();
 
         // Request first batch
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
@@ -718,7 +746,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Request bounded.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .timestampLong(clock.nowLong())
                 .term(1L)
                 .scanId(2L)
@@ -737,7 +765,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Empty result.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .timestampLong(clock.nowLong())
                 .term(1L)
                 .scanId(2L)
@@ -754,7 +782,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Lookup.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .timestampLong(clock.nowLong())
                 .term(1L)
                 .scanId(2L)
@@ -771,7 +799,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         int sortedIndexId = sortedIndexStorage.id();
 
         IntStream.range(0, 6).forEach(i -> {
@@ -788,7 +816,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             testMvPartitionStorage.commitWrite(rowId, clock.now());
         });
 
-        UUID scanTxId = TestTransactionIds.newTransactionId();
+        UUID scanTxId = newTxId();
 
         // Request first batch
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
@@ -823,7 +851,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Request bounded.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
@@ -841,7 +869,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Empty result.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
@@ -857,7 +885,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Lookup.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(sortedIndexId)
@@ -873,7 +901,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testReadOnlyScanRetrieveBatchReplicaRequstHashIndex() throws Exception {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         int hashIndexId = hashIndexStorage.id();
 
         IntStream.range(0, 7).forEach(i -> {
@@ -890,7 +918,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             testMvPartitionStorage.commitWrite(rowId, clock.now());
         });
 
-        UUID scanTxId = TestTransactionIds.newTransactionId();
+        UUID scanTxId = newTxId();
 
         // Request first batch
         CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
@@ -927,7 +955,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Empty result.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(hashIndexId)
@@ -943,7 +971,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         // Lookup.
         fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                 .groupId(grpId)
-                .transactionId(TestTransactionIds.newTransactionId())
+                .transactionId(newTxId())
                 .readTimestampLong(clock.nowLong())
                 .scanId(2L)
                 .indexToUse(hashIndexId)
@@ -959,7 +987,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() throws MarshallerException {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
 
         BinaryRow testRow = binaryRow(0);
         BinaryRow testRowPk = kvMarshaller.marshal(new TestKey(0, "k0"));
@@ -1011,7 +1039,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testWriteIntentOnPrimaryReplicaMultiRowOps() throws MarshallerException {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         BinaryRow row0 = binaryRow(0);
         BinaryRow row1 = binaryRow(1);
         Collection<BinaryRow> rows = asList(row0, row1);
@@ -1058,6 +1086,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
+        return doSingleRowRequest(txId, binaryRow, requestType, false);
+    }
+
+    private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) {
         return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
                     .groupId(grpId)
                     .transactionId(txId)
@@ -1065,6 +1097,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                     .binaryRowMessage(binaryRowMessage(binaryRow))
                     .term(1L)
                     .commitPartitionId(commitPartitionId())
+                    .full(full)
                     .build(),
                 localNode.id()
         );
@@ -1091,6 +1124,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     private CompletableFuture<?> doMultiRowRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType) {
+        return doMultiRowRequest(txId, binaryRows, requestType, false);
+    }
+
+    private CompletableFuture<?> doMultiRowRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
         return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
                     .groupId(grpId)
                     .transactionId(txId)
@@ -1098,12 +1135,17 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                     .binaryRowMessages(binaryRows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
                     .term(1L)
                     .commitPartitionId(commitPartitionId())
+                    .full(full)
                     .build(),
                 localNode.id()
         );
     }
 
     private CompletableFuture<?> doMultiRowPkRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType) {
+        return doMultiRowPkRequest(txId, binaryRows, requestType, false);
+    }
+
+    private CompletableFuture<?> doMultiRowPkRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
         return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
                         .groupId(grpId)
                         .transactionId(txId)
@@ -1111,6 +1153,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .primaryKeys(binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList()))
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
+                        .full(full)
                         .build(),
                 localNode.id()
         );
@@ -1118,7 +1161,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testWriteIntentOnPrimaryReplicaSingleUpdate() {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         AtomicInteger counter = new AtomicInteger();
 
         testWriteIntentOnPrimaryReplica(
@@ -1143,7 +1186,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     @Test
     public void testWriteIntentOnPrimaryReplicaUpdateAll() {
-        UUID txId = TestTransactionIds.newTransactionId();
+        UUID txId = newTxId();
         AtomicInteger counter = new AtomicInteger();
 
         testWriteIntentOnPrimaryReplica(
@@ -1278,7 +1321,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         BinaryRow br2Pk = kvMarshaller.marshal(new TestKey(2, "k" + 2));
 
         if (insertFirst) {
-            UUID tx0 = beginTx();
+            UUID tx0 = newTxId();
             upsert(tx0, br1);
             upsert(tx0, br2);
             cleanup(tx0);
@@ -1286,7 +1329,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
         txState = null;
 
-        UUID tx1 = beginTx();
+        UUID tx1 = newTxId();
         delete(tx1, br1Pk);
         upsert(tx1, br1);
 
@@ -1540,7 +1583,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private static Stream<Arguments> singleRowRequestTypes() {
         return Arrays.stream(RequestType.values())
-                .filter(RequestTypes::isSingleRow)
+                .filter(RequestTypes::isSingleRowRw)
                 .map(Arguments::of);
     }
 
@@ -1592,7 +1635,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private static Stream<Arguments> multiRowsRequestTypes() {
         return Arrays.stream(RequestType.values())
-                .filter(RequestTypes::isMultipleRows)
+                .filter(RequestTypes::isMultipleRowsRw)
                 .map(Arguments::of);
     }
 
@@ -1608,6 +1651,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow oldRow, BinaryRow newRow) {
+        return doReplaceRequest(targetTxId, oldRow, newRow, false);
+    }
+
+    private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow oldRow, BinaryRow newRow, boolean full) {
         return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
                     .groupId(grpId)
                     .transactionId(targetTxId)
@@ -1616,6 +1663,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                     .binaryRowMessage(binaryRowMessage(newRow))
                     .term(1L)
                     .commitPartitionId(commitPartitionId())
+                    .full(full)
                     .build(),
                 localNode.id()
         );
@@ -1659,16 +1707,21 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     @Test
     public void failsWhenFullScanReadsTupleWithIncompatibleSchemaFromFuture() {
         testFailsWhenReadingFromFutureIncompatibleSchema(
-                (targetTxId, key) -> partitionReplicaListener.invoke(
-                        TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
-                            .groupId(grpId)
-                            .transactionId(targetTxId)
-                            .term(1L)
-                            .scanId(1)
-                            .batchSize(100)
-                            .build(),
-                        localNode.id()
-                )
+                (targetTxId, key) -> doRwFullScanRetrieveBatchRequest(targetTxId, false)
+        );
+    }
+
+    private CompletableFuture<?> doRwFullScanRetrieveBatchRequest(UUID targetTxId, boolean full) {
+        return partitionReplicaListener.invoke(
+                TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+                        .groupId(grpId)
+                        .transactionId(targetTxId)
+                        .term(1L)
+                        .scanId(1)
+                        .batchSize(100)
+                        .full(full)
+                        .build(),
+                localNode.id()
         );
     }
 
@@ -1698,9 +1751,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         TestKey key = nextKey();
 
         if (RequestTypes.looksUpFirst(requestType)) {
-            UUID tx0 = beginTx();
-            upsert(tx0, binaryRow(key, someValue));
-            cleanup(tx0);
+            upsertInNewTxFor(key);
 
             // While handling the upsert, our mocks were touched, let's reset them to prevent false-positives during verification.
             Mockito.reset(schemaSyncService);
@@ -1708,7 +1759,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
         when(catalogService.activeCatalogVersion(anyLong())).thenReturn(42);
 
-        UUID targetTxId = beginTx();
+        UUID targetTxId = newTxId();
 
         CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, key);
 
@@ -1730,6 +1781,12 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         assertThat(catalogVersionAware.requiredCatalogVersion(), is(42));
     }
 
+    private void upsertInNewTxFor(TestKey key) {
+        UUID tx0 = newTxId();
+        upsert(tx0, binaryRow(key, someValue));
+        cleanup(tx0);
+    }
+
     @Test
     public void replaceRequestIsSuppliedWithRequiredCatalogVersion() {
         testWritesAreSuppliedWithRequiredCatalogVersion(RequestType.RW_REPLACE, (targetTxId, key) -> {
@@ -1763,8 +1820,146 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .map(Arguments::of);
     }
 
-    private static UUID beginTx() {
-        return TestTransactionIds.newTransactionId();
+    @CartesianTest
+    @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
+    void singleRowRwOperationsFailIfTableAlteredAfterTxStart(
+            RequestType requestType,
+            boolean onExistingRow,
+            boolean full
+    ) {
+        ListenerInvocation invocation = null;
+
+        if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+            };
+        } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+            };
+        } else {
+            fail("Uncovered type: " + requestType);
+        }
+
+        testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+    }
+
+    @SuppressWarnings("unused")
+    private static ArgumentSets singleRowRwOperationTypesFactory() {
+        return ArgumentSets.argumentsForFirstParameter(singleRowRwOperationTypes())
+                .argumentsForNextParameter(false, true)
+                .argumentsForNextParameter(false, true);
+    }
+
+    private static Stream<RequestType> singleRowRwOperationTypes() {
+        return Arrays.stream(RequestType.values())
+                .filter(RequestTypes::isSingleRowRw);
+    }
+
+    private void testRwOperationsFailIfTableAlteredAfterTxStart(
+            RequestType requestType,
+            boolean onExistingRow,
+            ListenerInvocation listenerInvocation
+    ) {
+        TestKey key = nextKey();
+
+        if (onExistingRow) {
+            upsertInNewTxFor(key);
+        }
+
+        UUID txId = newTxId();
+        HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+        CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
+        CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
+        when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+        when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+        when(catalogTables.table(tblId, txBeginTs.longValue())).thenReturn(tableVersion1);
+        when(catalogTables.table(eq(tblId), gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+
+        CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+        boolean expectValidationFailure;
+        if (RequestTypes.neverMisses(requestType)) {
+            expectValidationFailure = true;
+        } else {
+            expectValidationFailure = onExistingRow == RequestTypes.writesIfKeyDoesNotExist(requestType);
+        }
+
+        if (expectValidationFailure) {
+            IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
+            assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+            assertThat(
+                    ex.getMessage(),
+                    is("Table schema was updated after the transaction was started [table=1, startSchema=1, operationSchema=2]")
+            );
+        } else {
+            assertThat(future, willCompleteSuccessfully());
+        }
+    }
+
+    @CartesianTest
+    @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
+    void multiRowRwOperationsFailIfTableAlteredAfterTxStart(
+            RequestType requestType, boolean onExistingRow, boolean full
+    ) {
+        ListenerInvocation invocation = null;
+
+        if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+            };
+        } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+            };
+        } else {
+            fail("Uncovered type: " + requestType);
+        }
+
+        testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+    }
+
+    @SuppressWarnings("unused")
+    private static ArgumentSets multiRowRwOperationTypesFactory() {
+        return ArgumentSets.argumentsForFirstParameter(multiRowRwOperationTypes())
+                .argumentsForNextParameter(false, true)
+                .argumentsForNextParameter(false, true);
+    }
+
+    private static Stream<RequestType> multiRowRwOperationTypes() {
+        return Arrays.stream(RequestType.values())
+                .filter(RequestTypes::isMultipleRowsRw);
+    }
+
+    @CartesianTest
+    void replaceRequestFailsIfTableAlteredAfterTxStart(
+            @Values(booleans = {false, true}) boolean onExistingRow,
+            @Values(booleans = {false, true}) boolean full
+    ) {
+        testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> {
+            return doReplaceRequest(
+                    targetTxId,
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    full
+            );
+        });
+    }
+
+    @CartesianTest
+    void rwScanRequestFailsIfTableAlteredAfterTxStart(
+            @Values(booleans = {false, true}) boolean onExistingRow,
+            @Values(booleans = {false, true}) boolean full
+    ) {
+        testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> {
+            return doRwFullScanRetrieveBatchRequest(targetTxId, full);
+        });
+    }
+
+    private UUID newTxId() {
+        return transactionIdFor(clock.now());
     }
 
     private void upsert(UUID txId, BinaryRow row) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestTypeTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestTypeTest.java
new file mode 100644
index 0000000000..89b8767ed8
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestTypeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.action;
+
+
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_GET;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_GET_ALL;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_SCAN;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_SCAN;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.EnumSet;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+
+class RequestTypeTest {
+    @Test
+    void isRwReadWorksAsExpected() {
+        Set<RequestType> expectedRwReads = EnumSet.of(RW_GET, RW_GET_ALL, RW_SCAN);
+
+        for (RequestType requestType : RequestType.values()) {
+            if (expectedRwReads.contains(requestType)) {
+                assertTrue(requestType.isRwRead(), requestType + " must be an RW read");
+            } else {
+                assertFalse(requestType.isRwRead(), requestType + " must not be an RW read");
+            }
+        }
+    }
+
+    @Test
+    void isWriteWorksAsExpected() {
+        Set<RequestType> expectedNonWrites = EnumSet.of(RW_GET, RW_GET_ALL, RW_SCAN, RO_GET, RO_GET_ALL, RO_SCAN);
+
+        for (RequestType requestType : RequestType.values()) {
+            if (expectedNonWrites.contains(requestType)) {
+                assertFalse(requestType.isWrite(), requestType + " must not be a write");
+            } else {
+                assertTrue(requestType.isWrite(), requestType + " must be a write");
+            }
+        }
+    }
+}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 4beff4a5d4..e69a6fdf90 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -26,6 +26,9 @@ import static org.apache.ignite.utils.ClusterServiceTestUtils.waitForTopology;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -47,6 +50,7 @@ import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
@@ -93,6 +97,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
@@ -463,6 +468,10 @@ public class ItTxTestCluster {
                         raftSvc -> {
                             try {
                                 DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor);
+
+                                CatalogTables catalogTables = mock(CatalogTables.class);
+                                lenient().when(catalogTables.table(anyInt(), anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
                                 replicaManagers.get(assignment).startReplica(
                                         new TablePartitionId(tableId, partId),
                                         completedFuture(null),
@@ -488,6 +497,7 @@ public class ItTxTestCluster {
                                                 mock(IndexBuilder.class),
                                                 mock(SchemaSyncService.class, invocation -> completedFuture(null)),
                                                 mock(CatalogService.class),
+                                                catalogTables,
                                                 tablesConfig,
                                                 placementDriver
                                         ),
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java
index 1697dd1a92..737639b27a 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java
@@ -24,13 +24,24 @@ import org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
  */
 public class RequestTypes {
     /**
-     * Returns {@code true} if the operation works with a single row.
+     * Returns {@code true} if the operation works with a single PK and it's RW.
      */
-    public static boolean isSingleRow(RequestType type) {
+    public static boolean isSingleRowRwPkOnly(RequestType type) {
         switch (type) {
             case RW_GET:
             case RW_DELETE:
             case RW_GET_AND_DELETE:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the operation works with a single row (full row) and it's RW.
+     */
+    public static boolean isSingleRowRwFullRow(RequestType type) {
+        switch (type) {
             case RW_DELETE_EXACT:
             case RW_INSERT:
             case RW_UPSERT:
@@ -43,20 +54,38 @@ public class RequestTypes {
         }
     }
 
+    /**
+     * Returns {@code true} if the operation works with a single row and it's RW.
+     */
+    public static boolean isSingleRowRw(RequestType type) {
+        return isSingleRowRwPkOnly(type) || isSingleRowRwFullRow(type);
+    }
+
     /**
      * Returns {@code true} if the operation works with a single row and it's a write.
      */
     public static boolean isSingleRowWrite(RequestType type) {
-        return isSingleRow(type) && type != RequestType.RW_GET;
+        return isSingleRowRw(type) && type.isWrite();
     }
 
     /**
-     * Returns {@code true} if the operation works with multiple rows.
+     * Returns {@code true} if the operation works with multiple PKs and it's RW.
      */
-    public static boolean isMultipleRows(RequestType type) {
+    public static boolean isMultipleRowsRwPkOnly(RequestType type) {
         switch (type) {
             case RW_GET_ALL:
             case RW_DELETE_ALL:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the operation works with multiple rows (full rows) and it's RW.
+     */
+    public static boolean isMultipleRowsRwFullRows(RequestType type) {
+        switch (type) {
             case RW_DELETE_EXACT_ALL:
             case RW_INSERT_ALL:
             case RW_UPSERT_ALL:
@@ -67,10 +96,17 @@ public class RequestTypes {
     }
 
     /**
-     * Returns {@code true} if the operation works with multiple rows.
+     * Returns {@code true} if the operation works with multiple rows and it's RW.
+     */
+    public static boolean isMultipleRowsRw(RequestType type) {
+        return isMultipleRowsRwPkOnly(type) || isMultipleRowsRwFullRows(type);
+    }
+
+    /**
+     * Returns {@code true} if the operation works with multiple rows and it is a write.
      */
     public static boolean isMultipleRowsWrite(RequestType type) {
-        return isMultipleRows(type) && type != RequestType.RW_GET_ALL;
+        return isMultipleRowsRw(type) && type.isWrite();
     }
 
     /**
@@ -101,4 +137,32 @@ public class RequestTypes {
                 return true;
         }
     }
+
+    /**
+     * Returns {@code true} if the operation only makes a write if the corresponding key does not have a value yet in the table.
+     */
+    public static boolean writesIfKeyDoesNotExist(RequestType type) {
+        switch (type) {
+            case RW_INSERT:
+            case RW_INSERT_ALL:
+                return false;
+            default:
+                return true;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the operation always reads or writes something, regardless of whether there is something under the key
+     * in the table or not.
+     */
+    public static boolean neverMisses(RequestType type) {
+        switch (type) {
+            case RW_UPSERT:
+            case RW_UPSERT_ALL:
+            case RW_GET_AND_UPSERT:
+                return true;
+            default:
+                return false;
+        }
+    }
 }
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3a4fe1692b..a777af6887 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.table.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -42,6 +44,7 @@ import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.configuration.NamedListConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -85,6 +88,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
@@ -342,6 +346,12 @@ public class DummyInternalTableImpl extends InternalTableImpl {
 
         DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schema);
 
+        CatalogTables catalogTables = mock(CatalogTables.class);
+        CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
+
+        lenient().when(catalogTables.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+        lenient().when(tableDescriptor.tableVersion()).thenReturn(1);
+
         TablesConfiguration tablesConfig = mock(TablesConfiguration.class);
         NamedConfigurationTree<TableConfiguration, TableView, TableChange> tablesTree = mock(NamedListConfiguration.class);
         NamedListView<TableView> tablesList = mock(NamedListView.class);
@@ -380,6 +390,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 mock(IndexBuilder.class),
                 mock(SchemaSyncService.class, invocation -> completedFuture(null)),
                 mock(CatalogService.class),
+                catalogTables,
                 tablesConfig,
                 new TestPlacementDriver(LOCAL_NODE.name())
         );