You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/10/19 08:43:43 UTC

[ignite-3] branch main updated: IGNITE-17702 Move schema changes history from configuration to metastore - Fixes #1134.

This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7400742500 IGNITE-17702 Move schema changes history from configuration to metastore - Fixes #1134.
7400742500 is described below

commit 74007425000255c3a65bfbe4bd36d73537488d3a
Author: zstan <st...@gmail.com>
AuthorDate: Wed Oct 19 11:39:54 2022 +0300

    IGNITE-17702 Move schema changes history from configuration to metastore - Fixes #1134.
    
    Signed-off-by: zstan <st...@gmail.com>
---
 .../ignite/internal/index/IndexManagerTest.java    |   2 +-
 .../storage/ItRebalanceDistributedTest.java        |  50 +++-
 .../runner/app/AbstractSchemaChangeTest.java       |  42 +--
 .../internal/runner/app/ItDataSchemaSyncTest.java  | 113 +++++++-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../runner/app/ItTableApiContractTest.java         |  38 +--
 .../internal/runner/app/ItTablesApiTest.java       |  17 +-
 .../internal/sql/internal/InternalSchemaTest.java  | 123 +++++++++
 .../internal/test/WatchListenerInhibitor.java      |   9 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 modules/schema/build.gradle                        |   1 +
 modules/schema/pom.xml                             |   5 +
 .../ignite/internal/schema/SchemaManager.java      | 290 +++++++++++++++------
 .../ignite/internal/schema/SchemaRegistry.java     |   2 +-
 .../apache/ignite/internal/schema/SchemaUtils.java |  40 +--
 .../ExtendedTableConfigurationSchema.java          |   9 +-
 .../configuration/SchemaConfigurationSchema.java   |  31 ---
 .../schema/registry/SchemaRegistryImpl.java        |  11 +-
 .../schema/registry/UpgradingRowAdapter.java       |   2 +-
 .../schema/registry/SchemaRegistryImplTest.java    |  11 +-
 .../schema/registry/UpgradingRowAdapterTest.java   |   7 +-
 .../sql/engine/exec/ddl/DdlCommandHandler.java     | 111 ++++----
 .../internal/sql/engine/StopCalciteModuleTest.java |   2 +-
 .../sql/engine/exec/MockedStructuresTest.java      | 289 +-------------------
 .../internal/table/distributed/TableManager.java   |  88 ++-----
 .../table/distributed/TableManagerTest.java        |  50 ++--
 26 files changed, 717 insertions(+), 630 deletions(-)

diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index c8f183c317..e7e540cd9d 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -127,7 +127,7 @@ public class IndexManagerTest {
 
             chg.changePrimaryKey(pk -> pk.changeColumns("c1").changeColocationColumns("c1"));
 
-            ((ExtendedTableChange) chg).changeAssignments((byte) 1);
+            ((ExtendedTableChange) chg).changeAssignments((byte) 1).changeSchemaId(1);
         })).get();
     }
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a952cd39a5..b98a97ba87 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -175,7 +175,10 @@ public class ItRebalanceDistributedTest {
         assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
                 .tables().get("TBL1").replicas().value());
 
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(2);
+            return true;
+        }));
 
         waitPartitionAssignmentsSyncedToExpected(0, 2);
 
@@ -200,8 +203,15 @@ public class ItRebalanceDistributedTest {
         assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables()
                 .get("TBL1").replicas().value());
 
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(2);
+            return true;
+        }));
+
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(3);
+            return true;
+        }));
 
         waitPartitionAssignmentsSyncedToExpected(0, 3);
 
@@ -226,9 +236,20 @@ public class ItRebalanceDistributedTest {
         assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables()
                 .get("TBL1").replicas().value());
 
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(2);
+            return true;
+        }));
+
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(3);
+            return true;
+        }));
+
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(2);
+            return true;
+        }));
 
         waitPartitionAssignmentsSyncedToExpected(0, 2);
 
@@ -277,7 +298,10 @@ public class ItRebalanceDistributedTest {
                     return false;
                 });
 
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(3);
+            return true;
+        }));
 
         countDownLatch.await();
 
@@ -308,7 +332,10 @@ public class ItRebalanceDistributedTest {
         assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
                 .tables().get("TBL1").replicas().value());
 
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(1)));
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(1);
+            return true;
+        }));
 
         waitPartitionAssignmentsSyncedToExpected(0, 1);
 
@@ -331,7 +358,10 @@ public class ItRebalanceDistributedTest {
             return false;
         });
 
-        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
+        await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+            ch.changeReplicas(3);
+            return true;
+        }));
 
         waitPartitionAssignmentsSyncedToExpected(0, 3);
 
@@ -506,7 +536,7 @@ public class ItRebalanceDistributedTest {
                     metaStorageManager,
                     clusterService);
 
-            schemaManager = new SchemaManager(registry, tablesCfg);
+            schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager);
 
             tableManager = new TableManager(
                     name,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
index ee13ec017b..635322e8e2 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -232,11 +232,12 @@ abstract class AbstractSchemaChangeTest {
      */
     protected static void renameColumn(List<Ignite> nodes, String oldName, String newName) {
         await(((TableManager) nodes.get(0).tables()).alterTableAsync(TABLE,
-                tblChanger -> tblChanger.changeColumns(
-                        colListChanger -> colListChanger
-                                .rename(IgniteNameUtils.parseSimpleName(oldName), IgniteNameUtils.parseSimpleName(newName))
-                )
-        ));
+                tblChanger -> {
+                    tblChanger.changeColumns(
+                            colListChanger -> colListChanger
+                                .rename(IgniteNameUtils.parseSimpleName(oldName), IgniteNameUtils.parseSimpleName(newName)));
+                    return true;
+            }));
     }
 
     /**
@@ -247,16 +248,16 @@ abstract class AbstractSchemaChangeTest {
      * @param defSup Default value supplier.
      */
     protected static void changeDefault(List<Ignite> nodes, String colName, Supplier<Object> defSup) {
-        await(((TableManager) nodes.get(0).tables()).alterTableAsync(TABLE,
-                tblChanger -> tblChanger.changeColumns(
-                        colListChanger -> colListChanger
-                                .update(
-                                        IgniteNameUtils.parseSimpleName(colName),
-                                        colChanger -> colChanger.changeDefaultValueProvider(colDefChange -> colDefChange.convert(
-                                                ConstantValueDefaultChange.class).changeDefaultValue(defSup.get().toString()))
-                                )
-                )
-        ));
+        await(((TableManager) nodes.get(0).tables()).alterTableAsync(TABLE, tblChanger -> {
+            tblChanger.changeColumns(
+                    colListChanger -> colListChanger
+                            .update(
+                                    IgniteNameUtils.parseSimpleName(colName),
+                                    colChanger -> colChanger.changeDefaultValueProvider(colDefChange -> colDefChange.convert(
+                                            ConstantValueDefaultChange.class).changeDefaultValue(defSup.get().toString()))
+                            ));
+            return true;
+        }));
     }
 
     /**
@@ -268,11 +269,12 @@ abstract class AbstractSchemaChangeTest {
      */
     private static void assertColumnChangeFailed(List<Ignite> grid, String colName, Consumer<ColumnChange> colChanger) {
         assertThrows(IgniteException.class, () ->
-                await(((TableManager) grid.get(0).tables()).alterTableAsync(TABLE,
-                        tblChanger -> tblChanger.changeColumns(
-                                listChanger -> listChanger.update(IgniteNameUtils.parseSimpleName(colName), colChanger)
-                        )
-                ))
+                await(((TableManager) grid.get(0).tables()).alterTableAsync(TABLE, tblChanger -> {
+                    tblChanger.changeColumns(
+                            listChanger ->
+                                    listChanger.update(IgniteNameUtils.parseSimpleName(colName), colChanger));
+                    return true;
+                }))
         );
     }
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 4b92592ef2..64622eece3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.runner.app;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -97,7 +100,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
      * Starts a cluster before every test started.
      */
     @BeforeEach
-    void beforeEach() throws Exception {
+    void beforeEach() {
         List<CompletableFuture<Ignite>> futures = nodesBootstrapCfg.entrySet().stream()
                 .map(e -> IgnitionManager.start(e.getKey(), e.getValue(), workDir.resolve(e.getKey())))
                 .collect(toList());
@@ -125,6 +128,106 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
         IgniteUtils.closeAll(closeables);
     }
 
+    /**
+     * Test correctness of schema updates on lagged node.
+     */
+    @Test
+    public void checkSchemasCorrectUpdate() throws Exception {
+        Ignite ignite0 = clusterNodes.get(0);
+        IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
+        IgniteImpl ignite2 = (IgniteImpl) clusterNodes.get(2);
+
+        createTable(ignite0, TABLE_NAME);
+
+        TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME);
+
+        assertEquals(1, table.schemaView().schema().version());
+
+        WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
+
+        listenerInhibitor.startInhibit();
+
+        alterTable(ignite0, TABLE_NAME);
+
+        table = (TableImpl) ignite2.tables().table(TABLE_NAME);
+
+        TableImpl table0 = table;
+        assertTrue(waitForCondition(() -> table0.schemaView().schema().version() == 2, 5_000));
+
+        table = (TableImpl) ignite1.tables().table(TABLE_NAME);
+
+        assertEquals(1, table.schemaView().schema().version());
+
+        String nodeToStop = ignite1.name();
+
+        IgnitionManager.stop(nodeToStop);
+
+        listenerInhibitor.stopWithoutResend();
+
+        CompletableFuture<Ignite> ignite1Fut = nodesBootstrapCfg.entrySet().stream()
+                .filter(k -> k.getKey().equals(nodeToStop))
+                .map(e -> IgnitionManager.start(e.getKey(), e.getValue(), workDir.resolve(e.getKey())))
+                .findFirst().get();
+
+        ignite1 = (IgniteImpl) ignite1Fut.get();
+
+        table = (TableImpl) ignite1.tables().table(TABLE_NAME);
+
+        TableImpl table1 = table;
+        assertTrue(waitForCondition(() -> table1.schemaView().schema().version() == 2, 5_000));
+    }
+
+    /**
+     * Test correctness of schemes recovery after node restart.
+     */
+    @Test
+    public void checkSchemasCorrectlyRestore() throws Exception {
+        Ignite ignite1 = clusterNodes.get(1);
+
+        sql(ignite1, "CREATE TABLE " + TABLE_NAME + "(key BIGINT PRIMARY KEY, valint1 INT, valint2 INT)");
+
+        for (int i = 0; i < 10; ++i) {
+            sql(ignite1, String.format("INSERT INTO " + TABLE_NAME + " VALUES(%d, %d, %d)", i, i, 2 * i));
+        }
+
+        sql(ignite1, "ALTER TABLE " + TABLE_NAME + " DROP COLUMN valint1");
+
+        sql(ignite1, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valint3 INT");
+
+        sql(ignite1, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valint4 INT");
+
+        String nodeToStop = ignite1.name();
+
+        IgnitionManager.stop(nodeToStop);
+
+        CompletableFuture<Ignite> ignite1Fut = nodesBootstrapCfg.entrySet().stream()
+                .filter(k -> k.getKey().equals(nodeToStop))
+                .map(e -> IgnitionManager.start(e.getKey(), e.getValue(), workDir.resolve(e.getKey())))
+                .findFirst().get();
+
+        ignite1 = ignite1Fut.get();
+
+        Session ses = ignite1.sql().createSession();
+
+        ResultSet res = ses.execute(null, "SELECT valint2 FROM tbl1");
+
+        for (int i = 0; i < 10; ++i) {
+            assertNotNull(res.next().iterator().next());
+        }
+
+        for (int i = 10; i < 20; ++i) {
+            sql(ignite1, String.format("INSERT INTO " + TABLE_NAME + " VALUES(%d, %d, %d, %d)", i, i, i, i));
+        }
+
+        sql(ignite1, "ALTER TABLE " + TABLE_NAME + " DROP COLUMN valint3");
+
+        sql(ignite1, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valint5 INT");
+
+        res = ses.execute(null, "SELECT sum(valint4) FROM tbl1");
+
+        assertEquals(res.next().iterator().next(), 10L * (10 + 19) / 2);
+    }
+
     /**
      * The test executes various operation over the lagging node. The operations can be executed only the node overtakes a distributed
      * cluster state.
@@ -154,7 +257,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
 
         listenerInhibitor.startInhibit();
 
-        sql(ignite0, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valstr2 VARCHAR NOT NULL DEFAULT 'default'");
+        alterTable(ignite0, TABLE_NAME);
 
         for (Ignite node : clusterNodes) {
             if (node == ignite1) {
@@ -163,7 +266,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
 
             TableImpl tableOnNode = (TableImpl) node.tables().table(TABLE_NAME);
 
-            IgniteTestUtils.waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000);
+            waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000);
         }
 
         TableImpl table1 = (TableImpl) ignite1.tables().table(TABLE_NAME);
@@ -236,6 +339,10 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
         sql(node, "CREATE TABLE " + tableName + "(key BIGINT PRIMARY KEY, valint INT, valstr VARCHAR)");
     }
 
+    protected void alterTable(Ignite node, String tableName) {
+        sql(node, "ALTER TABLE " + tableName + " ADD COLUMN valstr2 VARCHAR NOT NULL DEFAULT 'default'");
+    }
+
     protected void sql(Ignite node, String query, Object... args) {
         try (Session session = node.sql().createSession()) {
             session.execute(null, query, args);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index cf47baecae..87adf836c1 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -278,7 +278,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
 
         TablesConfiguration tblCfg = clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
 
-        SchemaManager schemaManager = new SchemaManager(registry, tblCfg);
+        SchemaManager schemaManager = new SchemaManager(registry, tblCfg, metaStorageMgr);
 
         ReplicaService replicaSvc = new ReplicaService(
                 clusterSvc.messagingService(),
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
index 8db106a9ba..88a242487c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
@@ -122,21 +122,23 @@ public class ItTableApiContractTest extends AbstractBasicIntegrationTest {
                 .changeReplicas(2)
                 .changePartitions(10)));
 
-        await(tableManager().alterTableAsync(TABLE_NAME,
-                chng -> chng.changeColumns(cols -> {
+        await(tableManager().alterTableAsync(TABLE_NAME, chng -> {
+            chng.changeColumns(cols ->
                     cols.create("NAME", colChg -> convert(SchemaBuilders.column("name", ColumnType.string()).asNullable(true)
-                            .withDefaultValue("default").build(), colChg));
-                })));
+                            .withDefaultValue("default").build(), colChg)));
+            return true;
+        }));
 
         assertNotNull(ignite.tables().table(TABLE_NAME));
 
         assertNull(ignite.tables().table(TABLE_NAME + "_not_exist"));
 
-        assertThrows(TableNotFoundException.class, () -> await(tableManager().alterTableAsync(TABLE_NAME + "_not_exist",
-                chng -> chng.changeColumns(cols -> {
+        assertThrows(TableNotFoundException.class, () -> await(tableManager().alterTableAsync(TABLE_NAME + "_not_exist", chng -> {
+            chng.changeColumns(cols ->
                     cols.create("NAME", colChg -> convert(SchemaBuilders.column("name", ColumnType.string()).asNullable(true)
-                            .withDefaultValue("default").build(), colChg));
-                }))));
+                            .withDefaultValue("default").build(), colChg)));
+            return true;
+        })));
     }
 
     /**
@@ -156,16 +158,20 @@ public class ItTableApiContractTest extends AbstractBasicIntegrationTest {
                 .changePartitions(10)));
 
         CompletableFuture<Void> altTblFut1 = tableManager().alterTableAsync(TABLE_NAME,
-                chng -> chng.changeColumns(cols -> {
-                    cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME", ColumnType.string()).asNullable(true)
-                            .withDefaultValue("default").build(), colChg));
-                }));
+                chng -> {
+                    chng.changeColumns(cols ->
+                            cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME",
+                                            ColumnType.string()).asNullable(true).withDefaultValue("default").build(), colChg)));
+                return true;
+            });
 
         CompletableFuture<Void> altTblFut2 = tableManager().alterTableAsync(TABLE_NAME + "_not_exist",
-                chng -> chng.changeColumns(cols -> {
-                    cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME", ColumnType.string()).asNullable(true)
-                            .withDefaultValue("default").build(), colChg));
-                }));
+                chng -> {
+                    chng.changeColumns(cols ->
+                            cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME",
+                                            ColumnType.string()).asNullable(true).withDefaultValue("default").build(), colChg)));
+                    return true;
+            });
 
         assertNotNull(ignite.tables().table(TABLE_NAME));
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 069cd4713a..ba9eb80043 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -543,13 +543,16 @@ public class ItTablesApiTest extends IgniteAbstractTest {
     private void addColumnInternal(Ignite node, String tableName, ColumnDefinition colDefinition) {
         await(((TableManager) node.tables()).alterTableAsync(
                 tableName,
-                chng -> chng.changeColumns(cols -> {
-                    try {
-                        cols.create(colDefinition.name(), colChg -> convert(colDefinition, colChg));
-                    } catch (IllegalArgumentException e) {
-                        throw new ColumnAlreadyExistsException(colDefinition.name());
-                    }
-                })));
+                chng -> {
+                    chng.changeColumns(cols -> {
+                        try {
+                            cols.create(colDefinition.name(), colChg -> convert(colDefinition, colChg));
+                        } catch (IllegalArgumentException e) {
+                            throw new ColumnAlreadyExistsException(colDefinition.name());
+                        }
+                    });
+                    return true;
+                }));
     }
 
     /**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/internal/InternalSchemaTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/internal/InternalSchemaTest.java
new file mode 100644
index 0000000000..aab4b1e9af
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/internal/InternalSchemaTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.internal;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.junit.jupiter.api.Test;
+
+/** Tests for internal manipulations with schema. */
+public class InternalSchemaTest extends AbstractBasicIntegrationTest {
+    /**
+     * Checks that schema version is updated even if column names are intersected.
+     */
+    @Test
+    public void checkSchemaUpdatedWithEqAlterColumn() {
+        IgniteSql sql = igniteSql();
+        Session ses = sql.createSession();
+
+        checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        Ignite node = CLUSTER_NODES.get(0);
+
+        ConfigurationManager cfgMgr = IgniteTestUtils.getFieldValue(node, "clusterCfgMgr");
+
+        final TablesConfiguration tablesConfiguration = cfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
+
+        int schIdBefore = ((ExtendedTableView) tablesConfiguration.tables().get("TEST").value()).schemaId();
+
+        checkDdl(false, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS (VAL0 INT, VAL1 INT)");
+
+        int schIdAfter = ((ExtendedTableView) tablesConfiguration.tables().get("TEST").value()).schemaId();
+
+        assertEquals(schIdBefore + 1, schIdAfter);
+    }
+
+    /** Test correct mapping schema after drop columns. */
+    @Test
+    public void testDropColumns() {
+        IgniteSql sql = igniteSql();
+        Session ses = sql.createSession();
+
+        checkDdl(true, ses, "CREATE TABLE my (c1 INT PRIMARY KEY, c2 INT, c3 VARCHAR)");
+
+        ses.execute(
+                null,
+                "INSERT INTO my VALUES (1, 2, '3')"
+        );
+
+        ResultSet res = ses.execute(
+                null,
+                "SELECT c1, c3 FROM my"
+        );
+
+        assertTrue(res.hasNext());
+
+        checkDdl(true, ses, "ALTER TABLE my DROP COLUMN c2");
+
+        res = ses.execute(
+                null,
+                "SELECT c1, c3 FROM my"
+        );
+
+        assertNotNull(res.next());
+
+        checkDdl(true, ses, "ALTER TABLE my ADD COLUMN (c2 INT, c4 VARCHAR)");
+
+        res = ses.execute(
+                null,
+                "SELECT c1, c3 FROM my"
+        );
+
+        assertNotNull(res.next());
+    }
+
+    private static void checkDdl(boolean expectedApplied, Session ses, String sql) {
+        ResultSet res = ses.execute(
+                null,
+                sql
+        );
+
+        assertEquals(expectedApplied, res.wasApplied());
+        assertFalse(res.hasRowSet());
+        assertEquals(-1, res.affectedRows());
+
+        res.close();
+    }
+
+    /**
+     * Gets the SQL API.
+     *
+     * @return SQL API.
+     */
+    protected IgniteSql igniteSql() {
+        return CLUSTER_NODES.get(0).sql();
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index f6577fc689..a122ffe652 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -148,4 +148,13 @@ public class WatchListenerInhibitor implements WatchListener {
 
         inhibitEvents.clear();
     }
+
+    /**
+     * Stops silently, no events resend.
+     */
+    public synchronized void stopWithoutResend() {
+        inhibit = false;
+
+        inhibitEvents.clear();
+    }
 }
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f8cf815bf1..621a8ebdde 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -359,7 +359,7 @@ public class IgniteImpl implements Ignite {
                 )
         );
 
-        schemaManager = new SchemaManager(registry, tablesConfiguration);
+        schemaManager = new SchemaManager(registry, tablesConfiguration, metaStorageMgr);
 
         volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
 
diff --git a/modules/schema/build.gradle b/modules/schema/build.gradle
index 04cea165e0..f3086f3765 100644
--- a/modules/schema/build.gradle
+++ b/modules/schema/build.gradle
@@ -29,6 +29,7 @@ dependencies {
     implementation project(':ignite-bytecode')
     implementation project(':ignite-core')
     implementation project(':ignite-configuration')
+    implementation project(':ignite-metastorage')
     implementation libs.jetbrains.annotations
 
     testAnnotationProcessor project(':ignite-configuration-annotation-processor')
diff --git a/modules/schema/pom.xml b/modules/schema/pom.xml
index 485723eb2e..9131612929 100644
--- a/modules/schema/pom.xml
+++ b/modules/schema/pom.xml
@@ -58,6 +58,11 @@
             <artifactId>ignite-binary-tuple</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-metastorage</artifactId>
+        </dependency>
+
         <!-- 3rd party dependencies -->
         <dependency>
             <groupId>org.jetbrains</groupId>
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index d5e70fb272..e6cb7a0c21 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -19,42 +19,43 @@ package org.apache.ignite.internal.schema;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import org.apache.ignite.configuration.ConfigurationProperty;
 import org.apache.ignite.configuration.NamedListView;
-import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.causality.VersionedValue;
-import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Conditions;
+import org.apache.ignite.internal.metastorage.client.Entry;
+import org.apache.ignite.internal.metastorage.client.Operations;
+import org.apache.ignite.internal.schema.configuration.ColumnView;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
-import org.apache.ignite.internal.schema.configuration.SchemaConfiguration;
-import org.apache.ignite.internal.schema.configuration.SchemaView;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.event.SchemaEvent;
 import org.apache.ignite.internal.schema.event.SchemaEventParameters;
 import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
 import org.apache.ignite.lang.IgniteTriConsumer;
 import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -64,13 +65,8 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     /** Initial version for schemas. */
     public static final int INITIAL_SCHEMA_VERSION = 1;
 
-    /**
-     * If this property is set to {@code true} then an attempt to get the configuration property directly from the meta storage will be
-     * skipped, and the local property will be returned.
-     * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metostorage,
-     * TODO: will be removed after fix of the issue.
-     */
-    private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
+    /** Schema history key predicate part. */
+    public static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -84,50 +80,112 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     /** Versioned store for tables by name. */
     private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
 
+    /** Meta storage manager. */
+    private final MetaStorageManager metastorageMgr;
+
     /** Constructor. */
-    public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> registry, TablesConfiguration tablesCfg) {
+    public SchemaManager(
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            TablesConfiguration tablesCfg,
+            MetaStorageManager metastorageMgr
+    ) {
         this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
         this.tablesCfg = tablesCfg;
+        this.metastorageMgr = metastorageMgr;
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
-        ((ExtendedTableConfiguration) tablesCfg.tables().any()).schemas().listenElements(new ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                return onSchemaCreate(schemasCtx);
+        for (String tblName : tablesCfg.tables().value().namedListKeys()) {
+            ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) tablesCfg.tables().get(tblName));
+            UUID tblId = tblCfg.id().value();
+
+            Map<Integer, byte[]> schemas = collectAllSchemas(tblId);
+
+            byte[] serialized;
+
+            if (!schemas.isEmpty()) {
+                for (Map.Entry<Integer, byte[]> ent : schemas.entrySet()) {
+                    serialized = ent.getValue();
+
+                    SchemaDescriptor desc = SchemaSerializerImpl.INSTANCE.deserialize(serialized);
+
+                    createSchema(0, tblId, tblName, desc).join();
+                }
+
+                registriesVv.complete(0);
+            } else {
+                serialized = schemas.get(INITIAL_SCHEMA_VERSION);
+
+                assert serialized != null;
             }
-        });
+        }
+
+        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
     }
 
     /**
      * Listener of schema configuration changes.
      *
-     * @param schemasCtx Schemas configuration context.
+     * @param ctx Configuration context.
      * @return A future.
      */
-    private CompletableFuture<?> onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+    private CompletableFuture<?> onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
         }
 
         try {
-            long causalityToken = schemasCtx.storageRevision();
+            ExtendedTableView tblCfg = (ExtendedTableView) ctx.config(ExtendedTableConfiguration.class).value();
 
-            ExtendedTableConfiguration tblCfg = schemasCtx.config(ExtendedTableConfiguration.class);
+            int verFromUpdate = tblCfg.schemaId();
 
-            UUID tblId = tblCfg.id().value();
+            UUID tblId = tblCfg.id();
 
-            String tableName = tblCfg.name().value();
+            String tableName = tblCfg.name();
 
-            SchemaDescriptor schemaDescriptor = SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+            SchemaDescriptor schemaDescFromUpdate = SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
 
-            CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+            if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version()) != null) {
+                return completedFuture(null);
+            }
 
-            registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
-                    () -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+                SchemaDescriptor oldSchema = searchSchemaByVersion(tblId, verFromUpdate - 1);
+                assert oldSchema != null;
+
+                NamedListView<ColumnView> oldCols = ctx.oldValue();
+                NamedListView<ColumnView> newCols = ctx.newValue();
+
+                schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+                        oldSchema,
+                        oldCols,
+                        schemaDescFromUpdate,
+                        newCols));
+            }
+
+            long causalityToken = ctx.storageRevision();
+
+            CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
+
+            try {
+                final ByteArray key = schemaWithVerHistKey(tblId, verFromUpdate);
+
+                createSchemaFut.thenCompose(t -> metastorageMgr.invoke(
+                        Conditions.notExists(key),
+                        Operations.put(key, SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate)),
+                        Operations.noop()));
+            } catch (Throwable th) {
+                createSchemaFut.completeExceptionally(th);
+            }
+
+            createSchemaFut.whenComplete((ignore, th) -> {
+                if (th == null) {
+                    registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
+                            () -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+                }
+            });
 
             return createSchemaFut;
         } finally {
@@ -237,18 +295,32 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
      * @param schemaVer Schema version.
      * @return Schema descriptor.
      */
-    private SchemaDescriptor tableSchema(UUID tblId, String tableName, int schemaVer) {
+    private CompletableFuture<SchemaDescriptor> tableSchema(UUID tblId, String tableName, int schemaVer) {
         ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) tablesCfg.tables().get(tableName));
 
+        CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
+
         if (checkSchemaVersion(tblId, schemaVer)) {
-            return getSchemaDescriptorLocally(schemaVer, tblCfg);
-        }
+            SchemaDescriptor desc = searchSchemaByVersion(tblId, schemaVer);
 
-        CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
+            if (desc == null) {
+                return getSchemaDescriptor(schemaVer, tblCfg);
+            } else {
+                fut.complete(desc);
+                return fut;
+            }
+        }
 
         IgniteTriConsumer<Long, Map<UUID, SchemaRegistryImpl>, Throwable> schemaListener = (token, regs, e) -> {
             if (schemaVer <= regs.get(tblId).lastSchemaVersion()) {
-                fut.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+                getSchemaDescriptor(schemaVer, tblCfg)
+                        .whenComplete((desc, th) -> {
+                            if (th != null) {
+                                fut.completeExceptionally(th);
+                            } else {
+                                fut.complete(desc);
+                            }
+                        });
             }
         };
 
@@ -259,10 +331,13 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
         if (checkSchemaVersion(tblId, schemaVer)) {
             registriesVv.removeWhenComplete(schemaListener);
 
-            return getSchemaDescriptorLocally(schemaVer, tblCfg);
+            return getSchemaDescriptor(schemaVer, tblCfg);
         }
 
-        return fut.whenComplete((unused, throwable) -> registriesVv.removeWhenComplete(schemaListener)).join();
+        return fut.thenApply(res -> {
+            registriesVv.removeWhenComplete(schemaListener);
+            return res;
+        });
     }
 
     /**
@@ -281,59 +356,34 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     }
 
     /**
-     * Checks that the schema is configured in the Metasorage consensus.
+     * Try to find schema in cache.
      *
      * @param tblId Table id.
      * @param schemaVer Schema version.
-     * @return True when the schema configured, false otherwise.
+     * @return Descriptor if required schema found, or {@code null} otherwise.
      */
-    private boolean isSchemaExists(UUID tblId, int schemaVer) {
-        return latestSchemaVersion(tblId) >= schemaVer;
-    }
-
-    /**
-     * Gets the latest version of the table schema which available in Metastore.
-     *
-     * @param tblId Table id.
-     * @return The latest schema version.
-     */
-    private int latestSchemaVersion(UUID tblId) {
-        try {
-            NamedListView<SchemaView> tblSchemas = ((ExtendedTableConfiguration) getByInternalId(directProxy(tablesCfg.tables()), tblId))
-                    .schemas().value();
-
-            int lastVer = INITIAL_SCHEMA_VERSION;
-
-            for (String schemaVerAsStr : tblSchemas.namedListKeys()) {
-                int ver = Integer.parseInt(schemaVerAsStr);
-
-                if (ver > lastVer) {
-                    lastVer = ver;
-                }
-            }
-
-            return lastVer;
-        } catch (NoSuchElementException e) {
-            assert false : "Table must exist. [tableId=" + tblId + ']';
+    private SchemaDescriptor searchSchemaByVersion(UUID tblId, int schemaVer) {
+        SchemaRegistry registry = registriesVv.latest().get(tblId);
 
-            return INITIAL_SCHEMA_VERSION;
+        if (registry != null && schemaVer <= registry.lastSchemaVersion()) {
+            return registry.schema(schemaVer);
+        } else {
+            return null;
         }
     }
 
     /**
-     * Gets a schema descriptor from the local node configuration storage.
+     * Gets a schema descriptor from the configuration storage.
      *
      * @param schemaVer Schema version.
      * @param tblCfg    Table configuration.
      * @return Schema descriptor.
      */
-    @NotNull
-    private SchemaDescriptor getSchemaDescriptorLocally(int schemaVer, ExtendedTableConfiguration tblCfg) {
-        SchemaConfiguration schemaCfg = tblCfg.schemas().get(String.valueOf(schemaVer));
+    private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int schemaVer, ExtendedTableConfiguration tblCfg) {
+        CompletableFuture<Entry> ent = metastorageMgr.get(
+                schemaWithVerHistKey(tblCfg.id().value(), schemaVer));
 
-        assert schemaCfg != null;
-
-        return SchemaSerializerImpl.INSTANCE.deserialize(schemaCfg.schema().value());
+        return ent.thenApply(e -> SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
     }
 
     /**
@@ -400,15 +450,83 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     }
 
     /**
-     * Gets a direct accessor for the configuration distributed property.
-     * If the metadata access only locally configured the method will return local property accessor.
+     * Gets the latest version of the table schema which available in Metastore.
+     *
+     * @param tblId Table id.
+     * @return The latest schema version.
+     */
+    private int latestSchemaVersion(UUID tblId) {
+        try {
+            Cursor<Entry> cur = metastorageMgr.prefix(schemaHistPrefix(tblId));
+
+            int lastVer = INITIAL_SCHEMA_VERSION;
+
+            for (Entry ent : cur) {
+                String key = ent.key().toString();
+                int descVer = extractVerFromSchemaKey(key);
+
+                if (descVer > lastVer) {
+                    lastVer = descVer;
+                }
+            }
+
+            return lastVer;
+        } catch (NodeStoppingException e) {
+            throw new IgniteException(e.traceId(), e.code(), e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Collect all schemes for appropriate table.
      *
-     * @param property Distributed configuration property to receive direct access.
-     * @param <T> Type of the property accessor.
-     * @return An accessor for distributive property.
-     * @see #getMetadataLocallyOnly
+     * @param tblId Table id.
+     * @return Sorted by key collection of schemes.
+     */
+    private SortedMap<Integer, byte[]> collectAllSchemas(UUID tblId) {
+        try {
+            Cursor<Entry> cur = metastorageMgr.prefix(schemaHistPrefix(tblId));
+
+            SortedMap<Integer, byte[]> schemes = new TreeMap<>();
+
+            for (Entry ent : cur) {
+                String key = ent.key().toString();
+                int descVer = extractVerFromSchemaKey(key);
+
+                schemes.put(descVer, ent.value());
+            }
+
+            return schemes;
+        } catch (NodeStoppingException e) {
+            throw new IgniteException(e.traceId(), e.code(), e.getMessage(), e);
+        }
+    }
+
+    private int extractVerFromSchemaKey(String key) {
+        int pos = key.lastIndexOf('.');
+        assert pos != -1 : "Unexpected key: " + key;
+
+        key = key.substring(pos + 1);
+        return Integer.parseInt(key);
+    }
+
+    /**
+     * Forms schema history key.
+     *
+     * @param tblId Table id.
+     * @param ver Schema version.
+     * @return {@link ByteArray} representation.
+     */
+    private static ByteArray schemaWithVerHistKey(UUID tblId, int ver) {
+        return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
+    }
+
+    /**
+     * Forms schema history predicate.
+     *
+     * @param tblId Table id.
+     * @return {@link ByteArray} representation.
      */
-    private <T extends ConfigurationProperty<?>> T directProxy(T property) {
-        return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property);
+    private static ByteArray schemaHistPrefix(UUID tblId) {
+        return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX);
     }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index c648de1531..ffaaa97fa5 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -61,7 +61,7 @@ public interface SchemaRegistry {
     /**
      * Get last registered schema version.
      */
-    public int lastSchemaVersion();
+    int lastSchemaVersion();
 
     /**
      * Resolve binary row against given schema.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
index d0e942d13d..b211ff5e39 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
@@ -58,37 +58,39 @@ public class SchemaUtils {
     ) {
         ColumnMapper mapper = null;
 
-        // since newTblColumns comes from a Change class, it can only be of the same size or larger than the previous configuration,
-        // because removed keys are simply replaced with nulls
-        assert newTblColumns.size() >= oldTblColumns.size();
-
         for (int i = 0; i < newTblColumns.size(); ++i) {
             ColumnView newColView = newTblColumns.get(i);
 
-            // new value can be null if a column has been deleted
-            if (newColView == null) {
-                continue;
-            }
+            assert newColView != null;
+
+            Column newCol = newDesc.column(newColView.name());
 
             if (i < oldTblColumns.size()) {
                 ColumnView oldColView = oldTblColumns.get(i);
 
-                Column newCol = newDesc.column(newColView.name());
                 Column oldCol = oldDesc.column(oldColView.name());
 
                 if (newCol.schemaIndex() == oldCol.schemaIndex()) {
-                    continue;
+                    if (!newCol.name().equals(oldCol.name())) {
+                        if (mapper == null) {
+                            mapper = ColumnMapping.createMapper(newDesc);
+                        }
+
+                        Column oldIdx = oldDesc.column(newColView.name());
+
+                        // rename
+                        if (oldIdx != null) {
+                            mapper.add(newCol.schemaIndex(), oldIdx.schemaIndex());
+                        }
+                    }
+                } else {
+                    if (mapper == null) {
+                        mapper = ColumnMapping.createMapper(newDesc);
+                    }
+
+                    mapper.add(newCol.schemaIndex(), oldCol.schemaIndex());
                 }
-
-                if (mapper == null) {
-                    mapper = ColumnMapping.createMapper(newDesc);
-                }
-
-                mapper.add(newCol.schemaIndex(), oldCol.schemaIndex());
             } else {
-                // if the new Named List is larger than the old one, it can only mean that a new column has been added
-                Column newCol = newDesc.column(newColView.name());
-
                 assert !newDesc.isKeyColumn(newCol.schemaIndex());
 
                 if (mapper == null) {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java
index 1387ba3daa..a7780ca66e 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.schema.configuration;
 import java.util.UUID;
 import org.apache.ignite.configuration.annotation.InternalConfiguration;
 import org.apache.ignite.configuration.annotation.InternalId;
-import org.apache.ignite.configuration.annotation.NamedConfigValue;
 import org.apache.ignite.configuration.annotation.Value;
 
 /**
@@ -34,13 +33,13 @@ public class ExtendedTableConfigurationSchema extends TableConfigurationSchema {
     public UUID id;
 
     /**
-     * Serialized version of an affinity assignments. Currently configuration doesn't support neither collections nor array of arrays, so
+     * Serialized version of an affinity assignments. Currently, configuration doesn't support neither collections nor array of arrays, so
      * that serialization was chosen.
      */
     @Value
     public byte[] assignments;
 
-    /** Schemas history as named list where name is schema version and value is serialized version of schema itself. */
-    @NamedConfigValue
-    public SchemaConfigurationSchema schemas;
+    /** Current schema id. Monotonically increasing number. */
+    @Value
+    public int schemaId;
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationSchema.java
deleted file mode 100644
index eead4a22e6..0000000000
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationSchema.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema.configuration;
-
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.Value;
-
-/**
- * Configuration schema of a table schema. Part of an internal configuration.
- */
-@Config
-public class SchemaConfigurationSchema {
-    /** Serialized table schema configuration. */
-    @Value
-    public byte[] schema;
-}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index 5c79fcfbe3..c75e9ac2b1 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -48,7 +49,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     private volatile int lastVer;
 
     /** Schema store. */
-    private final Function<Integer, SchemaDescriptor> history;
+    private final Function<Integer, CompletableFuture<SchemaDescriptor>> history;
 
     /** The method to provide the latest schema version on cluster. */
     private final IntSupplier latestVersionStore;
@@ -61,7 +62,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
      * @param initialSchema      Initial schema.
      */
     public SchemaRegistryImpl(
-            Function<Integer, SchemaDescriptor> history,
+            Function<Integer, CompletableFuture<SchemaDescriptor>> history,
             IntSupplier latestVersionStore,
             SchemaDescriptor initialSchema
     ) {
@@ -86,7 +87,11 @@ public class SchemaRegistryImpl implements SchemaRegistry {
             return desc;
         }
 
-        desc = history.apply(ver);
+        CompletableFuture<SchemaDescriptor> descFut = history.apply(ver);
+
+        if (descFut != null) {
+            desc = descFut.join();
+        }
 
         if (desc != null) {
             schemaCache.putIfAbsent(ver, desc);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index db1c9cfbd7..7d357804de 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -77,7 +77,7 @@ class UpgradingRowAdapter extends Row {
      * Map column.
      *
      * @param colIdx Column index in source schema.
-     * @return Column index in targer schema.
+     * @return Column index in target schema.
      */
     private int mapColumn(int colIdx) throws InvalidTypeException {
         return mapper.map(colIdx);
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 8f9535a639..f6aacd6761 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -343,7 +344,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, schemaV2);
+        Map<Integer, CompletableFuture<SchemaDescriptor>> history = schemaHistory(schemaV1, schemaV2);
 
         final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () -> INITIAL_SCHEMA_VERSION, schemaV2);
 
@@ -409,7 +410,7 @@ public class SchemaRegistryImplTest {
                 new Column[]{new Column("keyLongCol", INT64, false)},
                 new Column[]{new Column("valStringCol", STRING, true)});
 
-        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3);
+        Map<Integer, CompletableFuture<SchemaDescriptor>> history = schemaHistory(schemaV2, schemaV3);
 
         final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () -> INITIAL_SCHEMA_VERSION, schemaV3);
 
@@ -475,7 +476,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3, schemaV4);
+        Map<Integer, CompletableFuture<SchemaDescriptor>> history = schemaHistory(schemaV2, schemaV3, schemaV4);
 
         final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () -> INITIAL_SCHEMA_VERSION, schemaV4);
 
@@ -576,8 +577,8 @@ public class SchemaRegistryImplTest {
      * @param history Table schema history.
      * @return Schema history map.
      */
-    private Map<Integer, SchemaDescriptor> schemaHistory(SchemaDescriptor... history) {
-        return Arrays.stream(history).collect(Collectors.toMap(SchemaDescriptor::version, e -> e));
+    private Map<Integer, CompletableFuture<SchemaDescriptor>> schemaHistory(SchemaDescriptor... history) {
+        return Arrays.stream(history).collect(Collectors.toMap(SchemaDescriptor::version, CompletableFuture::completedFuture));
     }
 
     /**
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
index 76aa829f37..d08a6035c6 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.schema.registry;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
 import static org.apache.ignite.internal.schema.NativeTypes.DATE;
 import static org.apache.ignite.internal.schema.NativeTypes.DOUBLE;
@@ -151,12 +152,14 @@ public class UpgradingRowAdapterTest {
         ByteBufferRow row = new ByteBufferRow(serializeValuesToRow(schema, values));
 
         // Validate row.
-        validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? schema : schema2, () -> INITIAL_SCHEMA_VERSION, schema), row);
+        validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? completedFuture(schema) : completedFuture(schema2),
+                () -> INITIAL_SCHEMA_VERSION, schema), row);
 
         // Validate upgraded row.
         values.add(addedColumnIndex, null);
 
-        validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? schema : schema2, () -> schema2.version(), schema2), row);
+        validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? completedFuture(schema) : completedFuture(schema2),
+                () -> schema2.version(), schema2), row);
     }
 
     private void validateRow(List<Object> values, SchemaRegistryImpl schemaRegistry, ByteBufferRow binaryRow) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 72df17a58f..b3927b655d 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -282,43 +282,51 @@ public class DdlCommandHandler {
      * @return {@code true} if the full columns set is applied successfully. Otherwise, returns {@code false}.
      */
     private CompletableFuture<Boolean> addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean ignoreColumnExistance) {
-        AtomicBoolean ret = new AtomicBoolean(true);
+        AtomicBoolean retUsr = new AtomicBoolean(true);
 
         return tableManager.alterTableAsync(
                 fullName,
-                chng -> chng.changeColumns(cols -> {
-                    ret.set(true); // Reset state if closure have been restarted.
+                chng -> {
+                    AtomicBoolean retTbl = new AtomicBoolean();
 
-                    Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
+                    chng.changeColumns(cols -> {
+                        retUsr.set(true); // Reset state if closure have been restarted.
 
-                    List<ColumnDefinition> colsDef0;
+                        Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
 
-                    if (ignoreColumnExistance) {
-                        colsDef0 = colsDef.stream().filter(k -> {
-                            if (colNamesToOrders.containsKey(k.name())) {
-                                ret.set(false);
+                        List<ColumnDefinition> colsDef0;
 
-                                return false;
-                            } else {
-                                return true;
-                            }
-                        }).collect(Collectors.toList());
-                    } else {
-                        colsDef.stream()
-                                .filter(k -> colNamesToOrders.containsKey(k.name()))
-                                .findAny()
-                                .ifPresent(c -> {
-                                    throw new ColumnAlreadyExistsException(c.name());
-                                });
-
-                        colsDef0 = colsDef;
-                    }
+                        if (ignoreColumnExistance) {
+                            colsDef0 = colsDef.stream().filter(k -> {
+                                if (colNamesToOrders.containsKey(k.name())) {
+                                    retUsr.set(false);
 
-                    for (ColumnDefinition col : colsDef0) {
-                        cols.create(col.name(), colChg -> convertColumnDefinition(col, colChg));
-                    }
-                })
-        ).thenApply(v -> ret.get());
+                                    return false;
+                                } else {
+                                    return true;
+                                }
+                            }).collect(Collectors.toList());
+                        } else {
+                            colsDef.stream()
+                                    .filter(k -> colNamesToOrders.containsKey(k.name()))
+                                    .findAny()
+                                    .ifPresent(c -> {
+                                        throw new ColumnAlreadyExistsException(c.name());
+                                    });
+
+                            colsDef0 = colsDef;
+                        }
+
+                        for (ColumnDefinition col : colsDef0) {
+                            cols.create(col.name(), colChg -> convertColumnDefinition(col, colChg));
+                        }
+
+                        retTbl.set(!colsDef0.isEmpty());
+                    });
+
+                    return retTbl.get();
+                }
+        ).thenApply(v -> retUsr.get());
     }
 
     private void convertColumnDefinition(ColumnDefinition definition, ColumnChange columnChange) {
@@ -368,37 +376,40 @@ public class DdlCommandHandler {
 
         return tableManager.alterTableAsync(
                 tableName,
-                chng -> chng.changeColumns(cols -> {
-                    ret.set(true); // Reset state if closure have been restarted.
+                chng -> {
+                    chng.changeColumns(cols -> {
+                        ret.set(true); // Reset state if closure have been restarted.
 
-                    PrimaryKeyView priKey = chng.primaryKey();
+                        PrimaryKeyView priKey = chng.primaryKey();
 
-                    Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
+                        Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
 
-                    Set<String> colNames0 = new HashSet<>();
+                        Set<String> colNames0 = new HashSet<>();
 
-                    Set<String> primaryCols = Set.of(priKey.columns());
+                        Set<String> primaryCols = Set.of(priKey.columns());
+
+                        for (String colName : colNames) {
+                            if (!colNamesToOrders.containsKey(colName)) {
+                                ret.set(false);
 
-                    for (String colName : colNames) {
-                        if (!colNamesToOrders.containsKey(colName)) {
-                            ret.set(false);
+                                if (!ignoreColumnExistence) {
+                                    throw new ColumnNotFoundException(DEFAULT_SCHEMA_NAME, tableName, colName);
+                                }
+                            } else {
+                                colNames0.add(colName);
+                            }
 
-                            if (!ignoreColumnExistence) {
-                                throw new ColumnNotFoundException(DEFAULT_SCHEMA_NAME, tableName, colName);
+                            if (primaryCols.contains(colName)) {
+                                throw new SqlException(DEL_PK_COMUMN_CONSTRAINT_ERR, IgniteStringFormatter
+                                        .format("Can`t delete column, belongs to primary key: [name={}]", colName));
                             }
-                        } else {
-                            colNames0.add(colName);
                         }
 
-                        if (primaryCols.contains(colName)) {
-                            throw new SqlException(DEL_PK_COMUMN_CONSTRAINT_ERR, IgniteStringFormatter
-                                    .format("Can`t delete column, belongs to primary key: [name={}]", colName));
-                        }
-                    }
+                        colNames0.forEach(k -> cols.delete(colNamesToOrders.get(k)));
+                    });
 
-                    colNames0.forEach(k -> cols.delete(colNamesToOrders.get(k)));
-                }))
-                .thenApply(v -> ret.get());
+                    return ret.get();
+                }).thenApply(v -> ret.get());
     }
 
     private static void convert(NativeType colType, ColumnTypeChange colTypeChg) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index e2d560e357..6ee94075e2 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -151,7 +151,7 @@ public class StopCalciteModuleTest {
                 new Column[]{new Column(1, "VAL", NativeTypes.INT32, false)}
         );
 
-        schemaReg = new SchemaRegistryImpl((v) -> schemaDesc, () -> INITIAL_SCHEMA_VERSION, schemaDesc);
+        schemaReg = new SchemaRegistryImpl((v) -> completedFuture(schemaDesc), () -> INITIAL_SCHEMA_VERSION, schemaDesc);
 
         when(tbl.name()).thenReturn("TEST");
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0cad686300..2d9618294f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -18,17 +18,12 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.internal.schema.configuration.storage.UnknownDataStorageConfigurationSchema.UNKNOWN_DATA_STORAGE;
 import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine.ENGINE_NAME;
 import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -37,12 +32,10 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -61,36 +54,26 @@ import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.storage.impl.TestDataStorageModule;
-import org.apache.ignite.internal.storage.impl.TestStorageEngine;
 import org.apache.ignite.internal.storage.impl.schema.TestDataStorageConfigurationSchema;
-import org.apache.ignite.internal.storage.impl.schema.TestDataStorageView;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbDataStorageModule;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
-import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.ColumnAlreadyExistsException;
-import org.apache.ignite.lang.ColumnNotFoundException;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -106,7 +89,6 @@ import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -217,6 +199,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
         when(rm.messagingService()).thenReturn(mock(MessagingService.class));
         when(rm.topologyService()).thenReturn(mock(TopologyService.class));
 
+        mockMetastore();
+
         revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
             function.apply(0L).join();
 
@@ -241,7 +225,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         dataStorageManager.start();
 
-        schemaManager = new SchemaManager(revisionUpdater, tblsCfg);
+        schemaManager = new SchemaManager(revisionUpdater, tblsCfg, msm);
 
         schemaManager.start();
 
@@ -274,33 +258,20 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 .get(1, TimeUnit.SECONDS);
     }
 
-    /**
-     * Checks inner transactions are initialized correctly.
-     */
-    @Test
-    public void testInnerTxInitiated() throws Exception {
-        SessionId sesId = queryProc.createSession(1000, PropertiesHolder.fromMap(Map.of()));
-
-        InternalTransaction tx = mock(InternalTransaction.class);
-
-        when(tm.begin()).thenReturn(tx);
-
-        String sql = "CREATE TABLE TEST (c1 int PRIMARY KEY, c2 varbinary(255))";
-
-        CompletableFuture<AsyncSqlCursor<List<Object>>> f = queryProc.querySingleAsync(sesId, QueryContext.of(), sql);
-
-        AsyncSqlCursor<List<Object>> asyncRes = f.get();
-
-        asyncRes.closeAsync();
-
-        verify(tm, never()).begin();
+    /** Dummy metastore activity mock. */
+    private void mockMetastore() throws Exception {
+        Cursor cursorMocked = mock(Cursor.class);
+        Iterator itMock = mock(Iterator.class);
+        when(itMock.hasNext()).thenReturn(false);
+        when(msm.prefix(any())).thenReturn(cursorMocked);
+        when(cursorMocked.iterator()).thenReturn(itMock);
     }
 
     /**
      * Tests create a table through public API.
      */
     @Test
-    public void testCreateTable() {
+    public void testCreateTable() throws Exception {
         SqlQueryProcessor finalQueryProc = queryProc;
 
         String curMethodName = getCurrentMethodName();
@@ -337,21 +308,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
         assertDoesNotThrow(() -> await(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql3).get(0)));
     }
 
-    /**
-     * Tests create a table with multiple pk through public API.
-     */
-    @Test
-    public void testCreateTableMultiplePk() {
-        String curMethodName = getCurrentMethodName();
-
-        String newTblSql = String.format("CREATE TABLE %s (c1 int, c2 int NOT NULL DEFAULT 1, c3 int, primary key(c1, c2))", curMethodName);
-
-        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
-
-        assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
-                .equalsIgnoreCase(curMethodName)));
-    }
-
     /**
      * Tests create and drop table through public API.
      */
@@ -384,197 +340,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 .equalsIgnoreCase("PUBLIC." + curMethodName)));
     }
 
-    /**
-     * Tests alter and drop columns through public API.
-     */
-    @Test
-    public void testAlterAndDropSimpleCase() {
-        SqlQueryProcessor finalQueryProc = queryProc;
-
-        String curMethodName = getCurrentMethodName();
-
-        String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName);
-
-        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
-
-        String alterCmd = String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 int)", curMethodName);
-
-        readFirst(queryProc.queryAsync("PUBLIC", alterCmd));
-
-        String alterCmd1 = String.format("ALTER TABLE %s ADD COLUMN c5 int NOT NULL DEFAULT 1", curMethodName);
-
-        readFirst(queryProc.queryAsync("PUBLIC", alterCmd1));
-
-        assertThrows(ColumnAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
-
-        String alterCmdNoTbl = String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 int)", curMethodName + "_notExist");
-
-        assertThrows(TableNotFoundException.class, () -> readFirst(queryProc.queryAsync("PUBLIC", alterCmdNoTbl)));
-
-        String alterIfExistsCmd = String.format("ALTER TABLE IF EXISTS %s ADD COLUMN (c3 varchar, c4 int)", curMethodName + "NotExist");
-
-        readFirst(queryProc.queryAsync("PUBLIC", alterIfExistsCmd));
-
-        assertThrows(ColumnAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
-
-        readFirst(finalQueryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS c3 varchar", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c3", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN IF EXISTS c3", curMethodName)));
-
-        assertThrows(ColumnNotFoundException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
-                String.format("ALTER TABLE %s DROP COLUMN (c3, c4)", curMethodName))));
-
-        assertThrows(IgniteException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
-                String.format("ALTER TABLE %s DROP COLUMN c1", curMethodName))));
-    }
-
-    /**
-     * Tests alter add multiple columns through public API.
-     */
-    @Test
-    public void testAlterColumnsAddBatch() {
-        String curMethodName = getCurrentMethodName();
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 varchar)", curMethodName)));
-
-        readFirst(queryProc
-                .queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar)", curMethodName)));
-
-        readFirst(
-                queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar, c5 varchar)",
-                        curMethodName)));
-
-        SqlQueryProcessor finalQueryProc = queryProc;
-
-        assertThrows(ColumnAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
-                String.format("ALTER TABLE %s ADD COLUMN (c5 varchar)", curMethodName))));
-    }
-
-    /**
-     * Tests alter drop multiple columns through public API.
-     */
-    @Test
-    public void testAlterColumnsDropBatch() {
-        String curMethodName = getCurrentMethodName();
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s "
-                + "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4 varchar, c5 varchar)", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN IF EXISTS (c3, c4, c5)", curMethodName)));
-
-        SqlQueryProcessor finalQueryProc = queryProc;
-
-        assertThrows(ColumnNotFoundException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
-                String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName))));
-    }
-
-    /**
-     * Tests create a table through public API.
-     */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-16032")
-    @Test
-    public void testCreateDropIndex() {
-        SqlQueryProcessor finalQueryProc = queryProc;
-
-        String curMethodName = getCurrentMethodName();
-
-        String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1", curMethodName);
-
-        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
-
-        assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
-                .equalsIgnoreCase("PUBLIC." + curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index1 ON %s (c1)", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX IF NOT EXISTS index1 ON %s (c1)", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index2 ON %s (c1)", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index3 ON %s (c2)", curMethodName)));
-
-        assertThrows(IndexAlreadyExistsException.class, () ->
-                readFirst(finalQueryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index3 ON %s (c1)", curMethodName))));
-
-        assertThrows(IgniteException.class, () ->
-                readFirst(finalQueryProc
-                        .queryAsync("PUBLIC", String.format("CREATE INDEX index_3 ON %s (c1)", curMethodName + "_nonExist"))));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName)));
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX IF EXISTS index4 ON %s", curMethodName)));
-    }
-
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17197")
-    @Test
-    void createTableWithEngine() throws Exception {
-        String method = getCurrentMethodName();
-
-        // Without engine.
-        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
-                "PUBLIC",
-                String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255))", method + 0)
-        )));
-
-        assertThat(tableView(method + 0).dataStorage(), instanceOf(RocksDbDataStorageView.class));
-
-        // With existing engine.
-        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
-                "PUBLIC",
-                String.format(
-                        "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s",
-                        method + 1,
-                        TestStorageEngine.ENGINE_NAME
-                )
-        )));
-
-        assertThat(tableView(method + 1).dataStorage(), instanceOf(TestDataStorageView.class));
-
-        // With existing engine in mixed case
-        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
-                "PUBLIC",
-                String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s", method + 2, "\"RocksDb\"")
-        )));
-
-        assertThat(tableView(method + 2).dataStorage(), instanceOf(RocksDbDataStorageView.class));
-
-        IgniteException exception = assertThrows(
-                IgniteException.class,
-                () -> readFirst(queryProc.queryAsync(
-                        "PUBLIC",
-                        String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s", method + 3, method)
-                ))
-        );
-
-        assertThat(exception.getMessage(), startsWith("Unexpected data storage engine"));
-
-        tblsCfg.defaultDataStorage().update(UNKNOWN_DATA_STORAGE).get(1, TimeUnit.SECONDS);
-
-        exception = assertThrows(
-                IgniteException.class,
-                () -> readFirst(queryProc.queryAsync(
-                        "PUBLIC",
-                        String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255))", method + 4)
-                ))
-        );
-
-        assertThat(exception.getMessage(), startsWith("Default data storage is not defined"));
-    }
-
     @Test
     void createTableWithTableOptions() {
         String method = getCurrentMethodName();
@@ -660,34 +425,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
         );
     }
 
-    /**
-     * Tests that schema that is not applied yet is accessible after some time.
-     *
-     * @throws Exception If test has failed.
-     */
-    @Test
-    public void testSchemaForTheFutureUpdate() throws Exception {
-        String curMethodName = getCurrentMethodName();
-
-        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s "
-                + "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4 varchar, c5 varchar)", curMethodName)));
-
-        SchemaRegistry schemaRegistry = (((TableImpl) tblManager.tables().get(0)).schemaView());
-
-        runAsync(() -> {
-            Thread.sleep(3000);
-            readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
-        });
-
-        int lastSchemaVersion = schemaRegistry.lastSchemaVersion();
-
-        //Ensure that we can get schema for the future update
-        assertTrue(waitForCondition(
-                () -> schemaRegistry.schema(lastSchemaVersion + 1).version() == lastSchemaVersion + 1,
-                5000
-        ));
-    }
-
     // todo copy-paste from TableManagerTest will be removed after https://issues.apache.org/jira/browse/IGNITE-16050
 
     /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 35c33e3372..a9d3629fd7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -73,7 +73,6 @@ import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.configuration.ConfigurationProperty;
 import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import org.apache.ignite.configuration.validation.ConfigurationValidationException;
 import org.apache.ignite.hlc.HybridClock;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.baseline.BaselineManager;
@@ -94,9 +93,7 @@ import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
@@ -106,7 +103,6 @@ import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.event.SchemaEvent;
 import org.apache.ignite.internal.schema.event.SchemaEventParameters;
-import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
@@ -188,7 +184,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /**
      * If this property is set to {@code true} then an attempt to get the configuration property directly from the meta storage will be
      * skipped, and the local property will be returned.
-     * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metostorage,
+     * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metastorage,
      * TODO: will be removed after fix of the issue.
      */
     private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
@@ -1148,34 +1144,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                         extConfCh.changeTableId(intTableId);
 
+                        extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
+
                         tableCreateFuts.put(extConfCh.id(), tblFut);
 
                         // Affinity assignments calculation.
                         extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
-                                        baselineMgr.nodes(),
-                                        tableChange.partitions(),
-                                        tableChange.replicas(),
-                                        HashSet::new)))
-                                // Table schema preparation.
-                                .changeSchemas(schemasCh -> schemasCh.create(
-                                        String.valueOf(INITIAL_SCHEMA_VERSION),
-                                        schemaCh -> {
-                                            SchemaDescriptor schemaDesc;
-
-                                            //TODO IGNITE-15747 Remove try-catch and force configuration
-                                            // validation here to ensure a valid configuration passed to
-                                            // prepareSchemaDescriptor() method.
-                                            try {
-                                                schemaDesc = SchemaUtils.prepareSchemaDescriptor(
-                                                        ((ExtendedTableView) tableChange).schemas().size(),
-                                                        tableChange);
-                                            } catch (IllegalArgumentException ex) {
-                                                throw new ConfigurationValidationException(ex.getMessage());
-                                            }
-
-                                            schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
-                                        }
-                                ));
+                                baselineMgr.nodes(),
+                                tableChange.partitions(),
+                                tableChange.replicas(),
+                                HashSet::new)));
                     });
                 })).exceptionally(t -> {
                     Throwable ex = getRootCause(t);
@@ -1210,7 +1188,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      *                         </ul>
      * @see TableNotFoundException
      */
-    public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
+    public CompletableFuture<Void> alterTableAsync(String name, Function<TableChange, Boolean> tableChange) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
         }
@@ -1221,58 +1199,28 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
     }
 
-    /** See {@link #alterTableAsync(String, Consumer)} for details. */
-    private CompletableFuture<Void> alterTableAsyncInternal(String name, Consumer<TableChange> tableChange) {
+    /** See {@link #alterTableAsync(String, Function)} for details. */
+    private CompletableFuture<Void> alterTableAsyncInternal(String name, Function<TableChange, Boolean> tableChange) {
         CompletableFuture<Void> tblFut = new CompletableFuture<>();
 
         tableAsync(name).thenAccept(tbl -> {
             if (tbl == null) {
                 tblFut.completeExceptionally(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
             } else {
-                TableImpl tblImpl = (TableImpl) tbl;
-
                 tablesCfg.tables().change(ch -> {
                     if (ch.get(name) == null) {
                         throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
                     }
 
                     ch.update(name, tblCh -> {
-                                tableChange.accept(tblCh);
-
-                                ((ExtendedTableChange) tblCh).changeSchemas(schemasCh ->
-                                        schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
-                                            ExtendedTableView currTableView = (ExtendedTableView) tablesCfg.tables().get(name).value();
-
-                                            SchemaDescriptor descriptor;
-
-                                            //TODO IGNITE-15747 Remove try-catch and force configuration validation
-                                            // here to ensure a valid configuration passed to prepareSchemaDescriptor() method.
-                                            try {
-                                                descriptor = SchemaUtils.prepareSchemaDescriptor(
-                                                        ((ExtendedTableView) tblCh).schemas().size(),
-                                                        tblCh);
-
-                                                descriptor.columnMapping(SchemaUtils.columnMapper(
-                                                        tblImpl.schemaView().schema(currTableView.schemas().size()),
-                                                        currTableView.columns(),
-                                                        descriptor,
-                                                        tblCh.columns()));
-                                            } catch (IllegalArgumentException ex) {
-                                                // Convert unexpected exceptions here,
-                                                // because validation actually happens later,
-                                                // when bulk configuration update is applied.
-                                                ConfigurationValidationException e =
-                                                        new ConfigurationValidationException(ex.getMessage());
-
-                                                e.addSuppressed(ex);
-
-                                                throw e;
-                                            }
-
-                                            schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
-                                        }));
-                            }
-                    );
+                        if (!tableChange.apply(tblCh)) {
+                            return;
+                        }
+
+                        ExtendedTableChange exTblChange = (ExtendedTableChange) tblCh;
+
+                        exTblChange.changeSchemaId(exTblChange.schemaId() + 1);
+                    });
                 }).whenComplete((res, t) -> {
                     if (t != null) {
                         Throwable ex = getRootCause(t);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1d02422b16..92ed4d5170 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -82,11 +83,9 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
-import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
 import org.apache.ignite.internal.schema.configuration.TableChange;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
@@ -104,6 +103,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
@@ -282,6 +282,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 SchemaBuilders.column("val", ColumnType.INT64).asNullable(true).build()
         ).withPrimaryKey("key").build();
 
+        mockMetastore();
+
         tblsCfg.tables().change(tablesChange -> {
             tablesChange.create(scmTbl.name(), tableChange -> {
                 (SchemaConfigurationConverter.convert(scmTbl, tableChange))
@@ -298,17 +300,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                     assignment.add(new HashSet<>(Collections.singleton(node)));
                 }
 
-                extConfCh.changeAssignments(ByteUtils.toBytes(assignment))
-                        .changeSchemas(schemasCh -> schemasCh.create(
-                                String.valueOf(1),
-                                schemaCh -> {
-                                    SchemaDescriptor schemaDesc = SchemaUtils.prepareSchemaDescriptor(
-                                            ((ExtendedTableView) tableChange).schemas().size(),
-                                            tableChange);
-
-                                    schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
-                                }
-                        ));
+                extConfCh.changeAssignments(ByteUtils.toBytes(assignment)).changeSchemaId(1);
             });
         }).join();
 
@@ -383,15 +375,18 @@ public class TableManagerTest extends IgniteAbstractTest {
                         .changeReplicas(REPLICAS)
                         .changePartitions(PARTITIONS);
 
-        final Consumer<TableChange> addColumnChange = (TableChange change) ->
-                change.changeColumns(cols -> {
-                    int colIdx = change.columns().namedListKeys().stream().mapToInt(Integer::parseInt).max().getAsInt() + 1;
+        final Function<TableChange, Boolean> addColumnChange = (TableChange change) -> {
+            change.changeColumns(cols -> {
+                int colIdx = change.columns().namedListKeys().stream().mapToInt(Integer::parseInt).max().getAsInt() + 1;
 
-                    cols.create(String.valueOf(colIdx),
-                            colChg -> SchemaConfigurationConverter.convert(SchemaBuilders.column("name", ColumnType.string()).build(),
-                                    colChg));
+                cols.create(String.valueOf(colIdx),
+                        colChg -> SchemaConfigurationConverter.convert(SchemaBuilders.column("name", ColumnType.string()).build(),
+                                colChg));
 
-                });
+            });
+
+            return true;
+        };
 
         TableManager igniteTables = tableManager;
 
@@ -510,6 +505,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 .withPrimaryKey("key")
                 .build();
 
+        when(msm.put(any(), any())).thenReturn(completedFuture(null));
+
         Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
 
         assertNotNull(table);
@@ -538,6 +535,15 @@ public class TableManagerTest extends IgniteAbstractTest {
         return mockManagersAndCreateTableWithDelay(tableDefinition, tblManagerFut, null);
     }
 
+    /** Dummy metastore activity mock. */
+    private void mockMetastore() throws Exception {
+        Cursor cursorMocked = mock(Cursor.class);
+        Iterator itMock = mock(Iterator.class);
+        when(itMock.hasNext()).thenReturn(false);
+        when(msm.prefix(any())).thenReturn(cursorMocked);
+        when(cursorMocked.iterator()).thenReturn(itMock);
+    }
+
     /**
      * Instantiates a table and prepares Table manager. When the latch would open, the method completes.
      *
@@ -618,6 +624,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                         .changePartitions(PARTITIONS)
         );
 
+        mockMetastore();
+
         assertTrue(createTblLatch.await(10, TimeUnit.SECONDS));
 
         TableImpl tbl2 = (TableImpl) tbl2Fut.get();
@@ -758,7 +766,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 dsm = createDataStorageManager(configRegistry, workDir, rocksDbEngineConfig),
                 workDir,
                 msm,
-                sm = new SchemaManager(revisionUpdater, tblsCfg),
+                sm = new SchemaManager(revisionUpdater, tblsCfg, msm),
                 budgetView -> new LocalLogStorageFactory(),
                 null
         );