You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/09/20 13:10:26 UTC

[ignite-3] branch main updated: IGNITE-17474 Indexes, rework configuration hierarchy - Fixes #1067.

This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eb0640761a IGNITE-17474 Indexes, rework configuration hierarchy - Fixes #1067.
eb0640761a is described below

commit eb0640761a62cfa7817720969e9158e7ad6228cc
Author: zstan <st...@gmail.com>
AuthorDate: Tue Sep 20 15:48:02 2022 +0300

    IGNITE-17474 Indexes, rework configuration hierarchy - Fixes #1067.
    
    Signed-off-by: zstan <st...@gmail.com>
---
 .../schemas/table/IndexValidator.java}             |  31 +-
 .../schemas/table/TableConfigurationSchema.java    |   4 -
 .../table/TableIndexConfigurationSchema.java       |   4 +
 .../schemas/table/TablesConfigurationSchema.java   |   5 +
 .../apache/ignite/lang/TableNotFoundException.java |   2 +-
 .../ignite/schema/definition/TableDefinition.java  |  10 -
 .../definition/index/ColumnarIndexDefinition.java  |   2 +-
 .../schema/definition/index/IndexDefinition.java   |   2 +-
 .../apache/ignite/client/ClientComputeTest.java    |   2 +-
 .../configuration/util/ConfigurationUtil.java      |   7 +
 modules/index/build.gradle                         |   2 +
 modules/index/pom.xml                              |  14 +
 .../apache/ignite/internal/index/IndexManager.java | 281 ++++++----------
 .../internal/index/SortedIndexDescriptor.java      |   2 +-
 .../ignite/internal/index/IndexManagerTest.java    | 358 ++++++++++-----------
 .../ignite/internal/compute/ItComputeTest.java     |   2 +-
 .../storage/ItRebalanceDistributedTest.java        |   4 -
 .../ignite/internal/index/ItIndexManagerTest.java  |   4 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../internal/runner/app/ItTablesApiTest.java       |  78 +++--
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  13 +-
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  13 +-
 .../sql/engine/AbstractBasicIntegrationTest.java   |  17 +
 .../internal/sql/engine/ItOrToUnionRuleTest.java   |  18 +-
 .../sql/engine/ItProjectScanMergeRuleTest.java     |   6 -
 .../internal/sql/engine/ItSecondaryIndexTest.java  |  29 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  21 +-
 .../apache/ignite/internal/schema/SchemaUtils.java |  12 +
 .../schema/configuration/IndexValidatorImpl.java   |  94 ++++++
 .../SchemaConfigurationConverter.java              |  42 +--
 .../SchemaDistributedConfigurationModule.java      |   4 +-
 .../schema/configuration/TableValidatorImpl.java   |   3 -
 .../schema/definition/SchemaValidationUtils.java   |  73 ++++-
 .../schema/definition/TableDefinitionImpl.java     |  18 +-
 .../internal/schema/SchemaConfigurationTest.java   |  30 +-
 ...t.java => AbstractTableIndexValidatorTest.java} |  50 +--
 .../configuration/IndexValidatorImplTest.java      |  65 ++++
 .../SchemaConfigurationConverterTest.java          | 106 +++---
 .../configuration/TableValidatorImplTest.java      |  38 +--
 .../testutils/builder/TableDefinitionBuilder.java  |   9 -
 .../builder/TableDefinitionBuilderImpl.java        |  20 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |   5 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |   7 +
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |   3 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |   6 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   2 +-
 .../engine/exec/schema/SqlSchemaManagerTest.java   |  24 +-
 .../internal/storage/engine/StorageEngine.java     |   4 +-
 .../storage/index/HashIndexDescriptor.java         |  14 +-
 .../storage/index/SortedIndexDescriptor.java       |  10 +-
 .../ConcurrentHashMapMvTableStorageTest.java       |  13 +-
 .../index/AbstractSortedIndexStorageTest.java      |  29 +-
 .../storage/index/TestHashIndexStorageTest.java    |  20 +-
 .../storage/index/TestSortedIndexStorageTest.java  |  40 ++-
 .../storage/AbstractMvTableStorageTest.java        | 137 +++++---
 .../chm/TestConcurrentHashMapMvTableStorage.java   |  17 +-
 .../chm/TestConcurrentHashMapStorageEngine.java    |   5 +-
 .../index/AbstractHashIndexStorageTest.java        |  23 +-
 .../pagememory/AbstractPageMemoryTableStorage.java |   8 +-
 .../PersistentPageMemoryStorageEngine.java         |   8 +-
 .../PersistentPageMemoryTableStorage.java          |   9 +-
 .../VolatilePageMemoryStorageEngine.java           |   6 +-
 .../pagememory/VolatilePageMemoryTableStorage.java |  10 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  14 +-
 .../mv/PersistentPageMemoryMvPartitionStorage.java |   6 +-
 .../mv/VolatilePageMemoryMvPartitionStorage.java   |   5 +-
 .../PersistentPageMemoryHashIndexStorageTest.java  |  26 +-
 .../VolatilePageMemoryHashIndexStorageTest.java    |  19 +-
 ...PersistentPageMemoryMvPartitionStorageTest.java |   2 +-
 .../VolatilePageMemoryMvPartitionStorageTest.java  |   2 +-
 .../rocksdb/{HashIndices.java => HashIndexes.java} |   4 +-
 .../storage/rocksdb/RocksDbStorageEngine.java      |   5 +-
 .../storage/rocksdb/RocksDbTableStorage.java       |  25 +-
 .../rocksdb/RocksDbMvPartitionStorageTest.java     |   2 +-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |  18 +-
 .../storage/rocksdb/RocksDbStorageEngineTest.java  |   4 +-
 .../rocksdb/index/RocksDbHashIndexStorageTest.java |  22 +-
 .../internal/table/distributed/TableManager.java   |  13 +-
 78 files changed, 1152 insertions(+), 912 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/schema/definition/index/ColumnarIndexDefinition.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/IndexValidator.java
similarity index 59%
copy from modules/api/src/main/java/org/apache/ignite/schema/definition/index/ColumnarIndexDefinition.java
copy to modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/IndexValidator.java
index ec2819c975..e2076301df 100644
--- a/modules/api/src/main/java/org/apache/ignite/schema/definition/index/ColumnarIndexDefinition.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/IndexValidator.java
@@ -15,27 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.schema.definition.index;
+package org.apache.ignite.configuration.schemas.table;
 
-import java.util.List;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 
 /**
- * Columnar index interface.
+ * Annotation to validate whole index configuration.
+ *
+ * <p>Activate IndexValidatorImpl in configuration engine for {@link TablesConfigurationSchema#indexes}.
  */
-public interface ColumnarIndexDefinition extends IndexDefinition {
-    /**
-     * Configured index columns.
-     *
-     * @return Index columns.
-     */
-    List<? extends IndexColumnDefinition> columns();
+@Target({FIELD, PARAMETER})
+@Retention(RUNTIME)
+public @interface IndexValidator {
 
-    /**
-     * Returns all index columns: user defined + implicitly attched.
-     *
-     * @return Indexed columns.
-     */
-    default List<? extends IndexColumnDefinition> indexedColumns() {
-        return columns();
-    }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
index b74fe1006f..4c2d7a744d 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
@@ -65,10 +65,6 @@ public class TableConfigurationSchema {
     @ConfigValue
     public PrimaryKeyConfigurationSchema primaryKey;
 
-    /** Indices configuration. */
-    @NamedConfigValue
-    public TableIndexConfigurationSchema indices;
-
     // TODO: IGNITE-16647 - RAFT configuration should be moved elsewhere
 
     /** Configuration for Raft groups corresponding to table partitions. */
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableIndexConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableIndexConfigurationSchema.java
index d08fdac13c..daebd70acb 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableIndexConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableIndexConfigurationSchema.java
@@ -47,6 +47,10 @@ public class TableIndexConfigurationSchema {
     @InjectedName
     public String name;
 
+    /** Table id the index associated with. */
+    @Value
+    public UUID tableId;
+
     /** Has default value flag. */
     @Value(hasDefault = true)
     public boolean uniq = false;
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TablesConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TablesConfigurationSchema.java
index 3ce5460336..4772d8c9a4 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TablesConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TablesConfigurationSchema.java
@@ -38,6 +38,11 @@ public class TablesConfigurationSchema {
     @TableValidator
     public TableConfigurationSchema tables;
 
+    /** List of configured indexes. */
+    @NamedConfigValue
+    @IndexValidator
+    public TableIndexConfigurationSchema indexes;
+
     /** Default data storage for tables. */
     @ExistingDataStorage
     @Value(hasDefault = true)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java b/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
index 8a92ba6e31..1f6b1e0992 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/TableNotFoundException.java
@@ -31,7 +31,7 @@ public class TableNotFoundException extends IgniteException {
      * @param name Table name.
      */
     public TableNotFoundException(String name) {
-        super(TABLE_NOT_FOUND_ERR, "Table does not exist [name=" + name + ']');
+        super(TABLE_NOT_FOUND_ERR, "The table does not exist [name=" + name + ']');
     }
 
     /**
diff --git a/modules/api/src/main/java/org/apache/ignite/schema/definition/TableDefinition.java b/modules/api/src/main/java/org/apache/ignite/schema/definition/TableDefinition.java
index 867c5ffbb5..f8f288f17d 100644
--- a/modules/api/src/main/java/org/apache/ignite/schema/definition/TableDefinition.java
+++ b/modules/api/src/main/java/org/apache/ignite/schema/definition/TableDefinition.java
@@ -17,16 +17,13 @@
 
 package org.apache.ignite.schema.definition;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import org.apache.ignite.schema.definition.index.IndexDefinition;
 
 /**
  * Table schema configuration.
  */
 public interface TableDefinition extends SchemaObject {
-
     /**
      * Returns table name.
      *
@@ -55,11 +52,4 @@ public interface TableDefinition extends SchemaObject {
      * @return List of columns.
      */
     List<ColumnDefinition> columns();
-
-    /**
-     * Returns table indices.
-     *
-     * @return Collection of indexes.
-     */
-    Collection<IndexDefinition> indices();
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/schema/definition/index/ColumnarIndexDefinition.java b/modules/api/src/main/java/org/apache/ignite/schema/definition/index/ColumnarIndexDefinition.java
index ec2819c975..8998a170cb 100644
--- a/modules/api/src/main/java/org/apache/ignite/schema/definition/index/ColumnarIndexDefinition.java
+++ b/modules/api/src/main/java/org/apache/ignite/schema/definition/index/ColumnarIndexDefinition.java
@@ -31,7 +31,7 @@ public interface ColumnarIndexDefinition extends IndexDefinition {
     List<? extends IndexColumnDefinition> columns();
 
     /**
-     * Returns all index columns: user defined + implicitly attched.
+     * Returns all index columns: user defined + implicitly attached.
      *
      * @return Indexed columns.
      */
diff --git a/modules/api/src/main/java/org/apache/ignite/schema/definition/index/IndexDefinition.java b/modules/api/src/main/java/org/apache/ignite/schema/definition/index/IndexDefinition.java
index 6421f04934..ad3b095735 100644
--- a/modules/api/src/main/java/org/apache/ignite/schema/definition/index/IndexDefinition.java
+++ b/modules/api/src/main/java/org/apache/ignite/schema/definition/index/IndexDefinition.java
@@ -20,7 +20,7 @@ package org.apache.ignite.schema.definition.index;
 import org.apache.ignite.schema.definition.SchemaObject;
 
 /**
- * Table index base interface.
+ * Index base interface.
  */
 public interface IndexDefinition extends SchemaObject {
     /**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 2ff50ba63a..909b31ad93 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -130,7 +130,7 @@ public class ClientComputeTest {
                     () -> client.compute().<String>executeColocated("bad-tbl", key, "job").join());
 
             var tblNotFoundEx = (TableNotFoundException) ex.getCause();
-            assertThat(tblNotFoundEx.getMessage(), containsString("Table does not exist [name=bad-tbl]"));
+            assertThat(tblNotFoundEx.getMessage(), containsString("The table does not exist [name=bad-tbl]"));
             assertEquals(TABLE_NOT_FOUND_ERR, tblNotFoundEx.code());
         }
     }
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
index 18912ccd6e..557ef7fc2f 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
@@ -1163,6 +1163,13 @@ public class ConfigurationUtil {
         return node.get(((NamedListNode<?>) node).keyByInternalId(internalId));
     }
 
+    /**
+     * Returns internal id for the value associated with the passed key.
+     */
+    public static <N> UUID getInternalId(NamedListView<N> node, String name) {
+        return ((NamedListNode<?>) node).internalId(name);
+    }
+
     /**
      * Returns all internal ids of the elements from the list.
      */
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 8136cffa86..3542181844 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -26,6 +26,8 @@ dependencies {
     implementation project(':ignite-configuration')
     implementation project(':ignite-extended-api')
     implementation libs.jetbrains.annotations
+    testImplementation(testFixtures(project(':ignite-configuration')))
+    testImplementation(testFixtures(project(':ignite-core')))
     testImplementation libs.mockito.core
     testImplementation libs.mockito.inline
     testImplementation libs.mockito.junit
diff --git a/modules/index/pom.xml b/modules/index/pom.xml
index d3e066b0cd..4c6e7628c0 100644
--- a/modules/index/pom.xml
+++ b/modules/index/pom.xml
@@ -49,6 +49,20 @@
         </dependency>
 
         <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-configuration</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-engine</artifactId>
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index c62d142e14..216370bfa0 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -18,17 +18,15 @@
 package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
 import static org.apache.ignite.internal.schema.SchemaUtils.canonicalName;
+import static org.apache.ignite.internal.util.IgniteObjectName.parseCanonicalName;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
@@ -37,22 +35,21 @@ import org.apache.ignite.configuration.schemas.table.HashIndexView;
 import org.apache.ignite.configuration.schemas.table.IndexColumnView;
 import org.apache.ignite.configuration.schemas.table.SortedIndexView;
 import org.apache.ignite.configuration.schemas.table.TableIndexChange;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableIndexView;
-import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.index.event.IndexEvent;
 import org.apache.ignite.internal.index.event.IndexEventParameters;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
-import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.StringUtils;
 import org.apache.ignite.lang.ErrorGroups;
 import org.apache.ignite.lang.ErrorGroups.Common;
-import org.apache.ignite.lang.ErrorGroups.Table;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.IndexNotFoundException;
@@ -67,12 +64,8 @@ import org.jetbrains.annotations.NotNull;
 public class IndexManager extends Producer<IndexEvent, IndexEventParameters> implements IgniteComponent {
     private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);
 
-    private final TableManager tableManager;
-
-    private final Consumer<ConfigurationNamedListListener<TableIndexView>> indicesConfigurationChangeSubscription;
-
-    private final Map<UUID, Index<?>> indexById = new ConcurrentHashMap<>();
-    private final Map<String, Index<?>> indexByName = new ConcurrentHashMap<>();
+    /** Common tables and indexes configuration. */
+    private final TablesConfiguration tablesCfg;
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -83,14 +76,10 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
     /**
      * Constructor.
      *
-     * @param tableManager Table manager.
+     * @param tablesCfg Tables and indexes configuration.
      */
-    public IndexManager(
-            TableManager tableManager,
-            Consumer<ConfigurationNamedListListener<TableIndexView>> indicesConfigurationChangeSubscription
-    ) {
-        this.tableManager = Objects.requireNonNull(tableManager, "tableManager");
-        this.indicesConfigurationChangeSubscription = Objects.requireNonNull(indicesConfigurationChangeSubscription, "tablesConfiguration");
+    public IndexManager(TablesConfiguration tablesCfg) {
+        this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
     }
 
     /** {@inheritDoc} */
@@ -98,7 +87,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
     public void start() {
         LOG.debug("Index manager is about to start");
 
-        indicesConfigurationChangeSubscription.accept(new ConfigurationListener());
+        tablesCfg.indexes().listenElements(new ConfigurationListener());
 
         LOG.info("Index manager started");
     }
@@ -116,9 +105,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
 
         busyLock.block();
 
-        indexById.clear();
-        indexByName.clear();
-
         LOG.info("Index manager stopped");
     }
 
@@ -132,12 +118,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
      * @param indexChange A consumer that suppose to change the configuration in order to provide description of an index.
      * @return A future represented the result of creation.
      */
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-17474
-    // Validation of the index name uniqueness is not implemented, because with given
-    // configuration hierarchy this is a bit tricky exercise. Given that this hierarchy
-    // is subject to change in the future, seems to be more rational just to omit this
-    // part for now
-    public CompletableFuture<Index<?>> createIndexAsync(
+    public CompletableFuture<Boolean> createIndexAsync(
             String schemaName,
             String indexName,
             String tableName,
@@ -153,73 +134,65 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         try {
             validateName(indexName);
 
-            CompletableFuture<Index<?>> future = new CompletableFuture<>();
+            // TODO: IGNITE-17677 Refactoring of usage IgniteObjectName utility class
+            String canonicalIndexName = parseCanonicalName(canonicalName(schemaName, indexName));
+
+            // TODO: IGNITE-17677 Refactoring of usage IgniteObjectName utility class
+            String canonicalName = parseCanonicalName(canonicalName(schemaName, tableName));
 
-            var canonicalName = canonicalName(schemaName, tableName);
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
 
-            tableManager.tableAsyncInternal(canonicalName).thenAccept((table) -> {
-                if (table == null) {
-                    var exception = new TableNotFoundException(canonicalName);
+            // Check index existence flag, avoid usage of hasCause + IndexAlreadyExistsException.
+            AtomicBoolean idxExist = new AtomicBoolean(false);
 
-                    LOG.info("Unable to create index [schema={}, table={}, index={}]",
-                            exception, schemaName, tableName, indexName);
+            tablesCfg.indexes().change(indexListChange -> {
+                idxExist.set(false);
 
-                    future.completeExceptionally(exception);
+                UUID tableId;
 
-                    return;
+                try {
+                    tableId = ConfigurationUtil.getInternalId(tablesCfg.tables().value(), canonicalName);
+                } catch (IllegalArgumentException e) {
+                    throw new TableNotFoundException(canonicalName);
                 }
 
-                tableManager.alterTableAsync(table.name(), tableChange -> tableChange.changeIndices(indexListChange -> {
-                    if (indexListChange.get(indexName) != null) {
-                        if (!failIfExists) {
-                            future.complete(null);
+                if (indexListChange.get(canonicalIndexName) != null) {
+                    idxExist.set(true);
 
-                            return;
-                        }
-                        var exception = new IndexAlreadyExistsException(indexName);
+                    throw new IndexAlreadyExistsException(canonicalIndexName);
+                }
 
-                        LOG.info("Unable to create index [schema={}, table={}, index={}]",
-                                exception, schemaName, tableName, indexName);
+                Consumer<TableIndexChange> chg = indexChange.andThen(c -> c.changeTableId(tableId));
 
-                        future.completeExceptionally(exception);
+                indexListChange.create(canonicalIndexName, chg);
+            }).whenComplete((index, th) -> {
+                if (th != null) {
+                    LOG.info("Unable to create index [schema={}, table={}, index={}]",
+                            th, schemaName, tableName, indexName);
 
-                        return;
+                    if (!failIfExists && idxExist.get()) {
+                        future.complete(false);
+                    } else {
+                        future.completeExceptionally(th);
                     }
+                } else if (!future.isDone()) {
+                    TableIndexConfiguration idxCfg = tablesCfg.indexes().get(canonicalIndexName);
 
-                    indexListChange.create(indexName, indexChange);
+                    if (idxCfg != null && idxCfg.value() != null) {
+                        LOG.info("Index created [schema={}, table={}, index={}, indexId={}]",
+                                schemaName, tableName, indexName, idxCfg.id().value());
 
-                    TableIndexView indexView = indexListChange.get(indexName);
-
-                    Set<String> columnNames = Set.copyOf(tableChange.columns().namedListKeys());
+                        future.complete(true);
+                    } else {
+                        var exception = new IgniteInternalException(
+                                Common.UNEXPECTED_ERR, "Looks like the index was concurrently deleted");
 
-                    validateColumns(indexView, columnNames);
-                })).whenComplete((index, th) -> {
-                    if (th != null) {
                         LOG.info("Unable to create index [schema={}, table={}, index={}]",
-                                th, schemaName, tableName, indexName);
-
-                        future.completeExceptionally(th);
-                    } else if (!future.isDone()) {
-                        String canonicalIndexName = canonicalName(schemaName, indexName);
-
-                        Index<?> createdIndex = indexByName.get(canonicalIndexName);
-
-                        if (createdIndex != null) {
-                            LOG.info("Index created [schema={}, table={}, index={}, indexId={}]",
-                                    schemaName, tableName, indexName, createdIndex.id());
-
-                            future.complete(createdIndex);
-                        } else {
-                            var exception = new IgniteInternalException(
-                                    Common.UNEXPECTED_ERR, "Looks like the index was concurrently deleted");
-
-                            LOG.info("Unable to create index [schema={}, table={}, index={}]",
-                                    exception, schemaName, tableName, indexName);
+                                exception, schemaName, tableName, indexName);
 
-                            future.completeExceptionally(exception);
-                        }
+                        future.completeExceptionally(exception);
                     }
-                });
+                }
             });
 
             return future;
@@ -238,8 +211,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
      * @param failIfNotExists Flag, which force failure, when {@code trues} if index doen't not exists.
      * @return A future representing the result of the operation.
      */
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-17474
-    // For now it is impossible to locate the index neither by id nor name.
     public CompletableFuture<Boolean> dropIndexAsync(
             String schemaName,
             String indexName,
@@ -254,55 +225,46 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         try {
             validateName(indexName);
 
-            String canonicalName = canonicalName(schemaName, indexName);
+            String canonicalName = parseCanonicalName(canonicalName(schemaName, indexName));
 
-            Index<?> index = indexByName.get(canonicalName);
+            TableIndexConfiguration idxCfg = tablesCfg.indexes().get(canonicalName);
 
-            if (index == null) {
+            if (idxCfg == null) {
                 return  failIfNotExists
                         ? CompletableFuture.failedFuture(new IndexNotFoundException(canonicalName))
                         : CompletableFuture.completedFuture(false);
             }
 
-            CompletableFuture<Boolean> future = new CompletableFuture<>();
-
-            tableManager.tableAsyncInternal(index.tableId(), false).thenAccept((table) -> {
-                if (table == null) {
-                    var exception = new IndexNotFoundException(canonicalName);
-
-                    LOG.info("Unable to drop index [schema={}, index={}]",
-                            exception, schemaName, indexName);
+            final CompletableFuture<Boolean> future = new CompletableFuture<>();
 
-                    future.completeExceptionally(exception);
-
-                    return;
-                }
+            final UUID tableId = idxCfg.tableId().value();
 
-                tableManager.alterTableAsync(table.name(), tableChange -> tableChange.changeIndices(indexListChange -> {
-                    if (indexListChange.get(indexName) == null) {
-                        var exception = new IndexNotFoundException(canonicalName);
+            tablesCfg.indexes().change(indexListChange -> {
+                TableView tableView = getByInternalId(tablesCfg.tables().value(), tableId);
 
-                        LOG.info("Unable to drop index [schema={}, index={}]",
-                                exception, schemaName, indexName);
-
-                        future.completeExceptionally(exception);
-
-                        return;
+                if (tableView == null) {
+                    if (failIfNotExists) {
+                        throw new TableNotFoundException(canonicalName);
+                    } else {
+                        future.complete(true);
                     }
+                }
 
-                    indexListChange.delete(indexName);
-                })).whenComplete((ignored, th) -> {
-                    if (th != null) {
-                        LOG.info("Unable to drop index [schema={}, index={}]",
-                                th, schemaName, indexName);
+                if (indexListChange.get(canonicalName) == null) {
+                    throw new IndexNotFoundException(canonicalName);
+                } else if (tableView != null) {
+                    indexListChange.delete(canonicalName);
+                }
+            }).whenComplete((ignored, th) -> {
+                if (th != null) {
+                    LOG.info("Unable to drop index [schema={}, index={}]", th, schemaName, indexName);
 
-                        future.completeExceptionally(th);
-                    } else if (!future.isDone()) {
-                        LOG.info("Index dropped [schema={}, index={}]", schemaName, indexName);
+                    future.completeExceptionally(th);
+                } else if (!future.isDone()) {
+                    LOG.info("Index dropped [schema={}, index={}]", schemaName, indexName);
 
-                        future.complete(true);
-                    }
-                });
+                    future.complete(true);
+                }
             });
 
             return future;
@@ -320,36 +282,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         }
     }
 
-    private void validateColumns(TableIndexView indexView, Set<String> tableColumns) {
-        if (indexView instanceof SortedIndexView) {
-            var sortedIndexView = (SortedIndexView) indexView;
-
-            validateColumns(sortedIndexView.columns().namedListKeys(), tableColumns);
-        } else if (indexView instanceof HashIndexView) {
-            validateColumns(Arrays.asList(((HashIndexView) indexView).columnNames()), tableColumns);
-        } else {
-            throw new AssertionError("Unknown index type [type=" + (indexView != null ? indexView.getClass() : null) + ']');
-        }
-    }
-
-    private void validateColumns(Iterable<String> indexedColumns, Set<String> tableColumns) {
-        if (CollectionUtils.nullOrEmpty(indexedColumns)) {
-            throw new IgniteInternalException(
-                    ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
-                    "At least one column should be specified by index definition"
-            );
-        }
-
-        for (var columnName : indexedColumns) {
-            if (!tableColumns.contains(columnName)) {
-                throw new IgniteInternalException(
-                        Table.COLUMN_NOT_FOUND_ERR,
-                        "Column not found [name=" + columnName + ']'
-                );
-            }
-        }
-    }
-
     /**
      * Callback method is called when index configuration changed and an index was dropped.
      *
@@ -357,9 +289,9 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
      * @return A future.
      */
     private CompletableFuture<?> onIndexDrop(ConfigurationNotificationEvent<TableIndexView> evt) {
-        if (!busyLock.enterBusy()) {
-            UUID idxId = evt.oldValue().id();
+        UUID idxId = evt.oldValue().id();
 
+        if (!busyLock.enterBusy()) {
             fireEvent(IndexEvent.DROP,
                     new IndexEventParameters(evt.storageRevision(), idxId),
                     new NodeStoppingException()
@@ -369,11 +301,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         }
 
         try {
-            Index<?> index = indexById.remove(evt.oldValue().id());
-
-            indexByName.remove(index.name(), index);
-
-            fireEvent(IndexEvent.DROP, new IndexEventParameters(evt.storageRevision(), index.id()), null);
+            fireEvent(IndexEvent.DROP, new IndexEventParameters(evt.storageRevision(), idxId), null);
         } finally {
             busyLock.leaveBusy();
         }
@@ -400,17 +328,17 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         }
 
         try {
+            UUID tableId = evt.newValue().tableId();
+
             return createIndexLocally(
                     evt.storageRevision(),
-                    //TODO: https://issues.apache.org/jira/browse/IGNITE-17474 Add tableID to index config instead of lookup to parent.
-                    evt.config(ExtendedTableConfiguration.class).id().value(), // Parent element is table.
+                    tableId,
                     evt.newValue());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    @NotNull
     private CompletableFuture<?> createIndexLocally(long causalityToken, UUID tableId, TableIndexView tableIndexView) {
         assert tableIndexView != null;
 
@@ -419,14 +347,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
 
         Index<?> index = newIndex(tableId, tableIndexView);
 
-        Index<?> prev = indexById.putIfAbsent(index.id(), index);
-
-        assert prev == null;
-
-        prev = indexByName.putIfAbsent(index.name(), index);
-
-        assert prev == null;
-
         fireEvent(IndexEvent.CREATE, new IndexEventParameters(causalityToken, index), null);
 
         return CompletableFuture.completedFuture(null);
@@ -452,14 +372,15 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
 
     private IndexDescriptor convert(HashIndexView indexView) {
         return new IndexDescriptor(
-                canonicalName("PUBLIC", indexView.name()),
+                indexView.name(),
                 Arrays.asList(indexView.columnNames())
         );
     }
 
     private SortedIndexDescriptor convert(SortedIndexView indexView) {
-        var indexedColumns = new ArrayList<String>();
-        var collations = new ArrayList<ColumnCollation>();
+        int colsCount = indexView.columns().size();
+        var indexedColumns = new ArrayList<String>(colsCount);
+        var collations = new ArrayList<ColumnCollation>(colsCount);
 
         for (var columnName : indexView.columns().namedListKeys()) {
             IndexColumnView columnView = indexView.columns().get(columnName);
@@ -469,32 +390,12 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         }
 
         return new SortedIndexDescriptor(
-                canonicalName("PUBLIC", indexView.name()),
+                indexView.name(),
                 indexedColumns,
                 collations
         );
     }
 
-    /**
-     * Waits for future result and return, or unwraps {@link CompletionException} to {@link IgniteException} if failed.
-     *
-     * @param future Completable future.
-     * @return Future result.
-     */
-    private <T> T join(CompletableFuture<T> future) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteInternalException(Common.NODE_STOPPING_ERR, new NodeStoppingException());
-        }
-
-        try {
-            return future.join();
-        } catch (CompletionException ex) {
-            throw IgniteException.wrap(ex.getCause());
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     private class ConfigurationListener implements ConfigurationNamedListListener<TableIndexView> {
         /** {@inheritDoc} */
         @Override
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexDescriptor.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexDescriptor.java
index 82c03c8431..f859cf6774 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexDescriptor.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexDescriptor.java
@@ -33,7 +33,7 @@ public class SortedIndexDescriptor extends IndexDescriptor {
      * Constructs a index description.
      *
      * @param name Name of the index.
-     * @param columns A list of indexed columns. Must not contains duplicates.
+     * @param columns A list of indexed columns. Must not contain duplicates.
      * @param collations A list of columns collations. Must be the same size as columns list.
      * @throws IllegalArgumentException If columns list contains duplicates or columns size doesn't match the collations size.
      */
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index e14174033a..fc79ebdd4f 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -17,18 +17,19 @@
 
 package org.apache.ignite.internal.index;
 
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.schema.SchemaUtils.canonicalName;
+import static org.apache.ignite.internal.util.IgniteObjectName.parseCanonicalName;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,106 +40,119 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import org.apache.ignite.configuration.ConfigurationValue;
-import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
-import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import org.apache.ignite.configuration.schemas.store.DataStorageConfigurationSchema;
+import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ColumnDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexChange;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexView;
-import org.apache.ignite.configuration.schemas.table.LogStorageBudgetConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.IndexValidator;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexChange;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableChange;
-import org.apache.ignite.configuration.schemas.table.TableConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.TableIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
-import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
-import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
+import org.apache.ignite.configuration.validation.ConfigurationValidationException;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.configuration.NamedListConfiguration;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.tree.ConverterToMapVisitor;
 import org.apache.ignite.internal.configuration.tree.TraversableTreeNode;
 import org.apache.ignite.internal.index.event.IndexEvent;
 import org.apache.ignite.internal.index.event.IndexEventParameters;
-import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.schema.configuration.IndexValidatorImpl;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.ErrorGroups;
 import org.apache.ignite.lang.ErrorGroups.Table;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.TableNotFoundException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Test class to verify {@link IndexManager}.
  */
+@ExtendWith(ConfigurationExtension.class)
 public class IndexManagerTest {
-    @Test
-    void indexManagerSubscribedToIdxConfigurationChangesOnStart() {
-        var indicesConfigurationChangeSubscription = new AtomicReference<>();
+    /** Configuration registry with one table for each test. */
+    private ConfigurationRegistry confRegistry;
+
+    /**
+     * Prepare configuration registry for test.
+     */
+    @BeforeEach
+    public void createRegistry() {
+        confRegistry = new ConfigurationRegistry(
+                List.of(TablesConfiguration.KEY),
+                Map.of(IndexValidator.class, Set.of(IndexValidatorImpl.INSTANCE)),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of(
+                        HashIndexConfigurationSchema.class,
+                        SortedIndexConfigurationSchema.class,
 
-        var indexManager = new IndexManager(mock(TableManager.class), indicesConfigurationChangeSubscription::set);
+                        UnknownDataStorageConfigurationSchema.class,
+                        ConstantValueDefaultConfigurationSchema.class,
+                        FunctionCallDefaultConfigurationSchema.class,
+                        NullValueDefaultConfigurationSchema.class,
+                        UnlimitedBudgetConfigurationSchema.class,
+                        EntryCountBudgetConfigurationSchema.class
+                )
+        );
 
-        indexManager.start();
+        confRegistry.start();
+    }
 
-        assertThat(indicesConfigurationChangeSubscription.get(), notNullValue());
+    @AfterEach
+    void tearDown() throws Exception {
+        confRegistry.stop();
     }
 
     @Test
-    void configurationChangedWhenCreateIsInvoked() {
-        var tableManagerMock = mock(TableManager.class);
-        var canonicalName = "sName.tName";
+    void configurationChangedWhenCreateIsInvoked() throws Exception {
+        TablesConfiguration tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
 
-        TableChange tableChange = createNode(TableConfigurationSchema.class);
+        var indexManager = new IndexManager(tablesConfig);
+        indexManager.start();
 
-        tableChange.changeColumns(columnListChange -> {
-            columnListChange.create("c1", columnChange -> {});
-            columnListChange.create("c2", columnChange -> {});
-        });
+        tablesConfig.tables().change(tableChange -> tableChange.create("SNAME.TNAME", chg -> {
+            chg.changeColumns(cols ->
+                    cols.create("c1", col -> col.changeType(t -> t.changeType("STRING")))
+                            .create("c2", col -> col.changeType(t -> t.changeType("STRING"))));
 
-        var successfulCompletion = new RuntimeException("This is expected");
+            chg.changePrimaryKey(pk -> pk.changeColumns("c1").changeColocationColumns("c1"));
+        })).get();
 
-        when(tableManagerMock.tableAsyncInternal(eq(canonicalName))).thenReturn(CompletableFuture.completedFuture(mock(TableImpl.class)));
-        when(tableManagerMock.alterTableAsync(any(), any())).thenAnswer(answer -> {
-            Consumer<TableChange> changeConsumer = answer.getArgument(1);
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> cfg0 = tablesConfig.tables();
 
-            try {
-                changeConsumer.accept(tableChange);
-            } catch (Throwable th) {
-                return CompletableFuture.failedFuture(th);
-            }
+        List<UUID> ids = ((NamedListConfiguration<TableConfiguration, ?, ?>) cfg0).internalIds();
 
-            // return failed future here to prevent index manager to further process the
-            // create request, because after returned future is competed, index manager expects
-            // to all configuration listeners to be executed, but no one have configured them
-            // for this test
-            return CompletableFuture.failedFuture(successfulCompletion);
-        });
+        assertEquals(1, ids.size());
 
-        var indexManager = new IndexManager(tableManagerMock, listener -> {});
+        UUID tableId = ids.get(0);
+
+        indexManager.createIndexAsync("sName", "idx", "tName", true, indexChange -> {
+            SortedIndexChange sortedIndexChange = indexChange.convert(SortedIndexChange.class);
+
+            sortedIndexChange.changeColumns(columns -> {
+                columns.create("c1", columnChange -> columnChange.changeAsc(true));
+                columns.create("c2", columnChange -> columnChange.changeAsc(false));
+            });
+
+            sortedIndexChange.changeTableId(tableId);
+        }).join();
 
-        try {
-            indexManager.createIndexAsync("sName", "idx", "tName", true, indexChange -> {
-                var sortedIndexChange = indexChange.convert(SortedIndexChange.class);
-
-                sortedIndexChange.changeColumns(columns -> {
-                    columns.create("c1", columnChange -> columnChange.changeAsc(true));
-                    columns.create("c2", columnChange -> columnChange.changeAsc(false));
-                });
-            }).join();
-        } catch (CompletionException ex) {
-            if (ex.getCause() != successfulCompletion) {
-                throw ex;
-            }
-        }
+
+        String awaitIdxName = parseCanonicalName(canonicalName("sName", "idx"));
 
         var expected = List.of(
                 Map.of(
@@ -152,38 +166,40 @@ public class IndexManagerTest {
                                         "name", "c2"
                                 )
                         ),
-                        "name", "idx",
+                        "name", awaitIdxName,
                         "type", "SORTED",
-                        "uniq", false
+                        "uniq", false,
+                        "tableId", tableId
                 )
         );
 
-        assertSameObjects(expected, toMap(tableChange.indices()));
+        assertSameObjects(expected, toMap(tablesConfig.indexes().value()));
     }
 
     @Test
     public void createIndexForNonExistingTable() {
-        var tableManagerMock = mock(TableManager.class);
-        var canonicalName = "sName.tName";
+        var canonicalName = parseCanonicalName(canonicalName("sName", "tName"));
 
-        when(tableManagerMock.tableAsyncInternal(eq(canonicalName))).thenReturn(CompletableFuture.completedFuture(null));
+        TablesConfiguration tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
 
-        var indexManager = new IndexManager(tableManagerMock, listener -> {});
+        var indexManager = new IndexManager(tablesConfig);
+        indexManager.start();
 
         CompletionException completionException = assertThrows(
                 CompletionException.class,
                 () -> indexManager.createIndexAsync("sName", "idx", "tName", true, indexChange -> {/* doesn't matter */}).join()
         );
 
-        assertThat(completionException.getCause(), instanceOf(TableNotFoundException.class));
-        assertThat(completionException.getCause().getMessage(), containsString(canonicalName));
+        assertTrue(IgniteTestUtils.hasCause(completionException, TableNotFoundException.class,
+                "The table does not exist [name=SNAME.TNAME]"));
     }
 
     @Test
     public void createIndexWithEmptyName() {
-        var tableManagerMock = mock(TableManager.class);
+        TablesConfiguration tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
 
-        var indexManager = new IndexManager(tableManagerMock, listener -> {});
+        var indexManager = new IndexManager(tablesConfig);
+        indexManager.start();
 
         CompletionException completionException = assertThrows(
                 CompletionException.class,
@@ -198,91 +214,79 @@ public class IndexManagerTest {
     }
 
     @Test
-    public void createIndexWithEmptyColumnList() {
-        var tableManagerMock = mock(TableManager.class);
-        var tableMock = mock(TableImpl.class);
-        TableChange tableChange = createNode(TableConfigurationSchema.class);
-
-        when(tableManagerMock.tableAsyncInternal(any())).thenReturn(CompletableFuture.completedFuture(tableMock));
-        when(tableManagerMock.alterTableAsync(any(), any())).thenAnswer(answer -> {
-            Consumer<TableChange> changeConsumer = answer.getArgument(1);
-
-            Throwable exception = null;
-            try {
-                changeConsumer.accept(tableChange);
-            } catch (Exception ex) {
-                exception = ex;
-            }
+    public void createIndexWithEmptyColumnList() throws Exception {
+        TablesConfiguration tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
 
-            if (exception == null) {
-                // consumer expected to fail on validation
-                exception = new AssertionError();
-            }
+        var indexManager = new IndexManager(tablesConfig);
+        indexManager.start();
 
-            return CompletableFuture.failedFuture(exception);
-        });
+        tablesConfig.tables().change(tableChange -> tableChange.create("SNAME.TNAME", chg -> {
+            chg.changeColumns(cols ->
+                    cols.create("id", col -> col.changeType(t -> t.changeType("STRING"))));
 
-        var indexManager = new IndexManager(tableManagerMock, listener -> {});
+            chg.changePrimaryKey(pk -> pk.changeColumns("id").changeColocationColumns("id"));
+        })).get();
 
         CompletionException completionException = assertThrows(
                 CompletionException.class,
                 () -> indexManager.createIndexAsync("sName", "idx", "tName", true,
-                        indexChange -> indexChange.convert(HashIndexChange.class).changeColumnNames()).join()
+                        indexChange -> indexChange.convert(HashIndexChange.class).changeColumnNames()
+                                .changeTableId(UUID.randomUUID())).join()
         );
 
-        assertThat(completionException.getCause(), instanceOf(IgniteInternalException.class));
+        assertTrue(IgniteTestUtils.hasCause(completionException, ConfigurationValidationException.class,
+                "At least one column should be specified"));
+
         assertThat(
-                ((IgniteInternalException) completionException.getCause()).code(),
-                equalTo(ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR)
+                (IgniteTestUtils.cause(completionException, ConfigurationValidationException.class)).getMessage(),
+                containsString(ErrorGroups.Index.INDEX_ERR_GROUP.name())
         );
     }
 
     @Test
-    public void createIndexForNonExistingColumn() {
-        var tableManagerMock = mock(TableManager.class);
-        var canonicalName = "sName.tName";
-        var tableMock = mock(TableImpl.class);
-        TableChange tableChange = createNode(TableConfigurationSchema.class);
-
-        when(tableManagerMock.tableAsyncInternal(eq(canonicalName))).thenReturn(CompletableFuture.completedFuture(tableMock));
-        when(tableManagerMock.alterTableAsync(any(), any())).thenAnswer(answer -> {
-            Consumer<TableChange> changeConsumer = answer.getArgument(1);
-
-            Throwable exception = null;
-            try {
-                changeConsumer.accept(tableChange);
-            } catch (Exception ex) {
-                exception = ex;
-            }
+    public void createIndexForNonExistingColumn() throws Exception {
+        TablesConfiguration tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
 
-            if (exception == null) {
-                // consumer expected to fail on validation
-                exception = new AssertionError();
-            }
+        var indexManager = new IndexManager(tablesConfig);
+        indexManager.start();
 
-            return CompletableFuture.failedFuture(exception);
-        });
+        tablesConfig.tables().change(tableChange -> tableChange.create("SNAME.TNAME", chg -> {
+            chg.changeColumns(cols ->
+                    cols.create("id", col -> col.changeType(t -> t.changeType("STRING"))));
 
-        var indexManager = new IndexManager(tableManagerMock, listener -> {});
+            chg.changePrimaryKey(pk -> pk.changeColumns("id").changeColocationColumns("id"));
+        })).get();
 
         CompletionException completionException = assertThrows(
                 CompletionException.class,
                 () -> indexManager.createIndexAsync("sName", "idx", "tName", true,
-                        indexChange -> indexChange.convert(HashIndexChange.class).changeColumnNames("nonExistingColumn")).join()
+                        indexChange ->
+                                indexChange.convert(HashIndexChange.class).changeColumnNames("nonExistingColumn")
+                                        .changeTableId(UUID.randomUUID())).join()
         );
 
-        assertThat(completionException.getCause(), instanceOf(IgniteInternalException.class));
+        assertTrue(IgniteTestUtils.hasCause(completionException, ConfigurationValidationException.class,
+                "column must exist in the schema"));
+
         assertThat(
-                ((IgniteInternalException) completionException.getCause()).code(),
-                equalTo(Table.COLUMN_NOT_FOUND_ERR)
+                (IgniteTestUtils.cause(completionException, ConfigurationValidationException.class)).getMessage(),
+                containsString(Table.TABLE_ERR_GROUP.name())
         );
     }
 
     @Test
-    public void dropNonExistingIndex() {
-        var tableManagerMock = mock(TableManager.class);
+    public void dropNonExistingIndex() throws Exception {
+        TablesConfiguration tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
 
-        var indexManager = new IndexManager(tableManagerMock, listener -> {});
+        var indexManager = new IndexManager(tablesConfig);
+        indexManager.start();
+
+        tablesConfig.tables().change(tableChange -> tableChange.create("SNAME.TNAME", chg -> {
+            chg.changeColumns(cols ->
+                    cols.create("id", col -> col.changeType(t -> t.changeType("STRING"))));
+
+            chg.changePrimaryKey(pk -> pk.changeColumns("id").changeColocationColumns("id"));
+        })).get();
 
         CompletionException completionException = assertThrows(
                 CompletionException.class,
@@ -298,17 +302,20 @@ public class IndexManagerTest {
 
     @Test
     @SuppressWarnings("ConstantConditions")
-    public void eventIsFiredWhenIndexCreated() {
-        var tableManagerMock = mock(TableManager.class);
-        var indexId = UUID.randomUUID();
-        var tableId = UUID.randomUUID();
-        var indexName = "idxName";
+    public void eventIsFiredWhenIndexCreated() throws Exception {
+        var indexName = "SCHEMA.INDEXNAME";
 
-        Consumer<ConfigurationNamedListListener<TableIndexView>> listenerConsumer = listener -> {
-            listener.onCreate(createConfigurationEventIndexAddedMock(indexId, tableId, indexName));
-        };
+        TablesConfiguration tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
+
+        var indexManager = new IndexManager(tablesConfig);
+        indexManager.start();
 
-        var indexManager = new IndexManager(tableManagerMock, listenerConsumer);
+        tablesConfig.tables().change(tableChange -> tableChange.create("SCHEMA.TNAME", chg -> {
+            chg.changeColumns(cols ->
+                    cols.create("id", col -> col.changeType(t -> t.changeType("STRING"))));
+
+            chg.changePrimaryKey(pk -> pk.changeColumns("id").changeColocationColumns("id"));
+        })).get();
 
         AtomicReference<IndexEventParameters> holder = new AtomicReference<>();
 
@@ -320,67 +327,34 @@ public class IndexManagerTest {
 
         indexManager.start();
 
-        assertThat(holder.get(), notNullValue());
-        assertThat(holder.get().index().id(), equalTo(indexId));
-        assertThat(holder.get().index().tableId(), equalTo(tableId));
-        assertThat(holder.get().index().name(), equalTo("PUBLIC." + indexName));
-    }
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> cfg0 = tablesConfig.tables();
 
-    @SuppressWarnings("unchecked")
-    private ConfigurationNotificationEvent<TableIndexView> createConfigurationEventIndexAddedMock(
-            UUID indexId,
-            UUID tableId,
-            String canonicalIndexName
-    ) {
-        ConfigurationValue<UUID> confValueMock = mock(ConfigurationValue.class);
-        when(confValueMock.value()).thenReturn(tableId).getMock();
+        List<UUID> ids = ((NamedListConfiguration<TableConfiguration, ?, ?>) cfg0).internalIds();
 
-        ExtendedTableConfiguration extendedConfMock = mock(ExtendedTableConfiguration.class);
-        when(extendedConfMock.id()).thenReturn(confValueMock);
+        assertEquals(1, ids.size());
 
-        HashIndexView indexViewMock = mock(HashIndexView.class);
-        when(indexViewMock.id()).thenReturn(indexId);
-        when(indexViewMock.name()).thenReturn(canonicalIndexName);
-        when(indexViewMock.columnNames()).thenReturn(new String[] {"c1", "c2"});
+        UUID tableId = ids.get(0);
 
-        ConfigurationNotificationEvent<TableIndexView> configurationEventMock = mock(ConfigurationNotificationEvent.class);
-        when(configurationEventMock.config(any())).thenReturn(extendedConfMock);
-        when(configurationEventMock.newValue()).thenReturn(indexViewMock);
+        indexManager.createIndexAsync("SCHEMA", "indexName", "tName", true, indexChange -> {
+            SortedIndexChange sortedIndexChange = indexChange.convert(SortedIndexChange.class);
 
-        return configurationEventMock;
-    }
+            sortedIndexChange.changeColumns(columns -> {
+                columns.create("id", columnChange -> columnChange.changeAsc(true));
+            });
 
-    /** Creates configuration node for given *SchemaConfiguration class. */
-    @SuppressWarnings({"unchecked", "SameParameterValue"})
-    private <T> T createNode(Class<?> cls) {
-        var cgen = new ConfigurationAsmGenerator();
-
-        Map<Class<?>, Set<Class<?>>> polymorphicExtensions = Map.of(
-                DataStorageConfigurationSchema.class,
-                Set.of(
-                        UnknownDataStorageConfigurationSchema.class
-                ),
-                TableIndexConfigurationSchema.class,
-                Set.of(
-                        HashIndexConfigurationSchema.class,
-                        SortedIndexConfigurationSchema.class
-                ),
-                ColumnDefaultConfigurationSchema.class,
-                Set.of(
-                        ConstantValueDefaultConfigurationSchema.class,
-                        NullValueDefaultConfigurationSchema.class,
-                        FunctionCallDefaultConfigurationSchema.class
-                ),
-                LogStorageBudgetConfigurationSchema.class,
-                Set.of(
-                        UnlimitedBudgetConfigurationSchema.class,
-                        EntryCountBudgetConfigurationSchema.class
-                )
-        );
+            sortedIndexChange.changeTableId(tableId);
+        }).join();
+
+        ids = ((NamedListConfiguration<TableIndexConfiguration, ?, ?>) tablesConfig.indexes()).internalIds();
+
+        assertEquals(1, ids.size());
 
-        cgen.compileRootSchema(TablesConfiguration.KEY.schemaClass(), Map.of(), polymorphicExtensions);
+        UUID indexId = ids.get(0);
 
-        return (T) cgen.instantiateNode(cls);
+        assertThat(holder.get(), notNullValue());
+        assertThat(holder.get().index().id(), equalTo(indexId));
+        assertThat(holder.get().index().tableId(), equalTo(tableId));
+        assertThat(holder.get().index().name(), equalTo(indexName));
     }
 
     private static Object toMap(Object obj) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index d0ef590b1b..319fb83cde 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -239,7 +239,7 @@ class ItComputeTest extends AbstractClusterIntegrationTest {
                         "bad-table", Tuple.create(Map.of("k", 1)), GetNodeNameJob.class).join());
 
         assertInstanceOf(TableNotFoundException.class, ex.getCause());
-        assertThat(ex.getCause().getMessage(), containsString("Table does not exist [name=bad-table]"));
+        assertThat(ex.getCause().getMessage(), containsString("The table does not exist [name=bad-table]"));
     }
 
     private void createTestTableWithOneRow() {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a4cbeb3b0c..98f7f26dbe 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -37,7 +37,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
-import org.apache.ignite.configuration.RootKey;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
@@ -414,9 +413,6 @@ public class ItRebalanceDistributedTest {
 
             txManager = new TableTxManagerImpl(clusterService, lockManager);
 
-            List<RootKey<?, ?>> rootKeys = List.of(
-                    TablesConfiguration.KEY);
-
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index bb5de7c829..63ac9f6cd6 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -83,9 +83,11 @@ public class ItIndexManagerTest extends AbstractBasicIntegrationTest {
             return CompletableFuture.completedFuture(true);
         });
 
-        Index<?> index = indexManager.createIndexAsync("PUBLIC", "INAME", "TNAME", true, tableIndexChange ->
+        indexManager.createIndexAsync("PUBLIC", "INAME", "TNAME", true, tableIndexChange ->
                 tableIndexChange.convert(HashIndexChange.class).changeColumnNames("c3", "c2")).join();
 
+        Index<?> index = createEventParamHolder.get().index();
+
         assertThat(index, notNullValue());
         assertThat(index.tableId(), equalTo(table.tableId()));
         assertThat(index.descriptor().columns(), hasItems("c3", "c2"));
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 28dfd7be78..7e5e84bede 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -188,7 +188,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
             String name,
             @Language("HOCON") String cfgString,
             @Nullable Consumer<Long> revisionCallback
-    ) throws NodeStoppingException {
+    ) {
         Path dir = workDir.resolve(name);
 
         partialNode = new ArrayList<>();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index d918340e96..71bd56492f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,11 +42,13 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.IndexAlreadyExistsException;
@@ -54,7 +57,7 @@ import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.schema.definition.ColumnDefinition;
 import org.apache.ignite.schema.definition.ColumnType;
-import org.apache.ignite.schema.definition.index.IndexDefinition;
+import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterEach;
@@ -73,7 +76,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
     public static final String SHORT_TABLE_NAME = "tbl1";
 
     /** Table name. */
-    public static final String TABLE_NAME = SCHEMA + "." + SHORT_TABLE_NAME;
+    public static final String TABLE_NAME = SchemaUtils.canonicalName(SCHEMA, SHORT_TABLE_NAME);
 
     /** Nodes bootstrap configuration. */
     private final List<String> nodesBootstrapCfg = List.of(
@@ -253,12 +256,15 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         createTable(ignite0, SCHEMA, SHORT_TABLE_NAME);
 
-        addIndex(ignite0, SCHEMA, SHORT_TABLE_NAME);
+        tryToCreateIndex(ignite0, SCHEMA, SHORT_TABLE_NAME, true);
 
-        assertThrows(IndexAlreadyExistsException.class,
-                () -> addIndex(ignite0, SCHEMA, SHORT_TABLE_NAME));
+        try {
+            tryToCreateIndex(ignite0, SCHEMA, SHORT_TABLE_NAME, true);
+        } catch (Throwable e) {
+            IgniteTestUtils.hasCause(e, IndexAlreadyExistsException.class, null);
+        }
 
-        addIndexIfNotExists(ignite0, SCHEMA, SHORT_TABLE_NAME);
+        tryToCreateIndex(ignite0, SCHEMA, SHORT_TABLE_NAME, false);
     }
 
     /**
@@ -280,15 +286,20 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         ignite1Inhibitor.startInhibit();
 
-        addIndex(ignite0, SCHEMA, SHORT_TABLE_NAME);
+        tryToCreateIndex(ignite0, SCHEMA, SHORT_TABLE_NAME, true);
 
-        CompletableFuture<Void> addIndesFut = runAsync(() -> addIndex(ignite1, SCHEMA, SHORT_TABLE_NAME));
+        CompletableFuture<Void> addIndesFut = runAsync(() -> tryToCreateIndex(ignite1, SCHEMA, SHORT_TABLE_NAME, true));
         CompletableFuture<Void> addIndesIfNotExistsFut = runAsync(() -> addIndexIfNotExists(ignite1, SCHEMA, SHORT_TABLE_NAME));
 
         for (Ignite ignite : clusterNodes) {
             if (ignite != ignite1) {
-                assertThrows(IndexAlreadyExistsException.class,
-                        () -> addIndex(ignite, SCHEMA, SHORT_TABLE_NAME));
+                try {
+                    tryToCreateIndex(ignite, SCHEMA, SHORT_TABLE_NAME, true);
+
+                    fail("Should not reach here");
+                } catch (Throwable e) {
+                    IgniteTestUtils.hasCause(e, IndexAlreadyExistsException.class, null);
+                }
 
                 addIndexIfNotExists(ignite, SCHEMA, SHORT_TABLE_NAME);
             }
@@ -299,13 +310,11 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         ignite1Inhibitor.stopInhibit();
 
-        assertThrows(IndexAlreadyExistsException.class, () -> {
-            try {
-                addIndesFut.get(10, TimeUnit.SECONDS);
-            } catch (ExecutionException e) {
-                throw e.getCause();
-            }
-        });
+        try {
+            addIndesFut.get(10, TimeUnit.SECONDS);
+        } catch (Throwable e) {
+            IgniteTestUtils.hasCause(e, IndexAlreadyExistsException.class, null);
+        }
 
         addIndesIfNotExistsFut.get(10, TimeUnit.SECONDS);
     }
@@ -574,20 +583,15 @@ public class ItTablesApiTest extends IgniteAbstractTest {
      * @param schemaName Schema name.
      * @param shortTableName Table name.
      */
-    protected void addIndex(Ignite node, String schemaName, String shortTableName) {
-        IndexDefinition idx = SchemaBuilders.hashIndex("testHI")
-                .withColumns("valInt", "valStr")
-                .build();
+    protected void tryToCreateIndex(Ignite node, String schemaName, String shortTableName, boolean failIfNotExist) {
+        String tblName = SchemaUtils.canonicalName(schemaName, shortTableName);
 
-        node.tables().alterTable(schemaName + "." + shortTableName, chng -> chng.changeIndices(idxes -> {
-            if (idxes.get(idx.name()) != null) {
-                log.info("Index already exists [naem={}]", idx.name());
-
-                throw new IndexAlreadyExistsException(idx.name());
-            }
+        var tmpl  = "CREATE INDEX %s testHI ON %s (valInt, valStr)";
+        var sql = String.format(tmpl, failIfNotExist ? "" : "IF NOT EXISTS", tblName);
 
-            idxes.create(idx.name(), tableIndexChange -> convert(idx, tableIndexChange));
-        }));
+        try (Session ses = node.sql().createSession()) {
+            ses.execute(null, sql);
+        }
     }
 
     /**
@@ -598,16 +602,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
      * @param shortTableName Table name.
      */
     protected void addIndexIfNotExists(Ignite node, String schemaName, String shortTableName) {
-        IndexDefinition idx = SchemaBuilders.hashIndex("testHI")
-                .withColumns("valInt", "valStr")
-                .build();
-
-        node.tables().alterTable(schemaName + "." + shortTableName, chng -> chng.changeIndices(idxes -> {
-            if (idxes.get(idx.name()) == null) {
-                idxes.create(idx.name(), tableIndexChange -> convert(idx, tableIndexChange));
-            } else {
-                log.info("Index already exists [naem={}]", idx.name());
-            }
-        }));
+        String tblName = SchemaUtils.canonicalName(schemaName, shortTableName);
+
+        try (Session ses = node.sql().createSession()) {
+            ses.execute(null, String.format("CREATE INDEX IF NOT EXISTS testHI ON %s (CAT_ID)", tblName));
+        }
     }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 7e8ceaf1e2..fa9f8ba929 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -134,7 +134,7 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1 VARCHAR");
         checkError(
                 TableNotFoundException.class,
-                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                "The table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
                 ses,
                 "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
         );
@@ -151,17 +151,20 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
         checkError(
                 IndexAlreadyExistsException.class,
-                "Index already exists [name=TEST_IDX]",
+                "Index already exists [name=PUBLIC.TEST_IDX]",
                 ses,
                 "CREATE INDEX TEST_IDX ON TEST(VAL1)"
         );
         checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON TEST(VAL1)");
 
+        checkDdl(true, ses, "DROP INDEX TESt_iDX");
+        checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
+
         // DROP COLUMNS
         checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
         checkError(
                 TableNotFoundException.class,
-                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                "The table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
                 ses,
                 "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
         );
@@ -182,10 +185,12 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
         checkDdl(true, ses, "DROP TABLE TEST");
         checkError(
                 TableNotFoundException.class,
-                "Table does not exist [name=PUBLIC.TEST]",
+                "The table does not exist [name=PUBLIC.TEST]",
                 ses,
                 "DROP TABLE TEST"
         );
+
+        checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX");
     }
 
     @Test
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index d7867aab07..599cb71c56 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -110,7 +110,7 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
         checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1 VARCHAR");
         checkError(
                 TableNotFoundException.class,
-                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                "The table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
                 ses,
                 "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
         );
@@ -127,17 +127,20 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
         checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
         checkError(
                 IndexAlreadyExistsException.class,
-                "Index already exists [name=TEST_IDX]",
+                "Index already exists [name=PUBLIC.TEST_IDX]",
                 ses,
                 "CREATE INDEX TEST_IDX ON TEST(VAL1)"
         );
         checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON TEST(VAL1)");
 
+        checkDdl(true, ses, "DROP INDEX TESt_iDX");
+        checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
+
         // DROP COLUMNS
         checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
         checkError(
                 TableNotFoundException.class,
-                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                "The table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
                 ses,
                 "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
         );
@@ -158,10 +161,12 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest {
         checkDdl(true, ses, "DROP TABLE TEST");
         checkError(
                 TableNotFoundException.class,
-                "Table does not exist [name=PUBLIC.TEST]",
+                "The table does not exist [name=PUBLIC.TEST]",
                 ses,
                 "DROP TABLE TEST"
         );
+
+        checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX");
     }
 
     @Test
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
index 47c064ec1d..7a2d69082e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.schema.definition.ColumnType;
 import org.apache.ignite.schema.definition.TableDefinition;
 import org.apache.ignite.sql.ColumnMetadata;
+import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -165,6 +166,22 @@ public class AbstractBasicIntegrationTest extends BaseIgniteAbstractTest {
         }
     }
 
+    /**
+     * Appends indexes.
+     *
+     * @param node Execution cluster node.
+     * @param idxs Map with index representation.
+     * @param tblCanonicalName Canonical table name to create index in.
+     */
+    protected static void addIndexes(Ignite node, Map<String, List<String>> idxs, String tblCanonicalName) {
+        try (Session ses = node.sql().createSession()) {
+            for (Map.Entry<String, List<String>> idx : idxs.entrySet()) {
+                ses.execute(null, String.format("CREATE INDEX %s ON %s (%s)", idx.getKey(), tblCanonicalName,
+                        String.join(",", idx.getValue())));
+            }
+        }
+    }
+
     /**
      * Invokes before the test will start.
      *
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
index 44466aa938..943cdbe753 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
@@ -23,6 +23,7 @@ import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsUn
 import static org.hamcrest.CoreMatchers.not;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.schema.definition.ColumnType;
@@ -69,20 +70,23 @@ public class ItOrToUnionRuleTest extends AbstractBasicIntegrationTest {
                 .withPrimaryKey("ID")
                 .build();
 
-        Table tbl = CLUSTER_NODES.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
+        String tbl1CanonicalName = schTbl1.canonicalName();
+
+        Table tbl = CLUSTER_NODES.get(0).tables().createTable(tbl1CanonicalName, tblCh ->
                 SchemaConfigurationConverter.convert(schTbl1, tblCh)
                         .changeReplicas(1)
                         .changePartitions(10)
         );
 
-        CLUSTER_NODES.get(0).tables().alterTable(schTbl1.canonicalName(), tblCh ->
-                List.of(SchemaBuilders.sortedIndex(IDX_CATEGORY).addIndexColumn("CATEGORY").done().build(),
-                        SchemaBuilders.sortedIndex(IDX_CAT_ID).addIndexColumn("CAT_ID").done().build(),
-                        SchemaBuilders.sortedIndex(IDX_SUBCATEGORY).addIndexColumn("SUBCATEGORY").done().build(),
-                        SchemaBuilders.sortedIndex(IDX_SUBCAT_ID).addIndexColumn("SUBCAT_ID").done().build()
-                ).forEach(idxDef -> SchemaConfigurationConverter.addIndex(idxDef, tblCh))
+        Map<String, List<String>> idxs = Map.of(
+                IDX_CATEGORY, List.of("CATEGORY"),
+                IDX_CAT_ID, List.of("CAT_ID"),
+                IDX_SUBCATEGORY, List.of("SUBCATEGORY"),
+                IDX_SUBCAT_ID, List.of("SUBCAT_ID")
         );
 
+        addIndexes(CLUSTER_NODES.get(0), idxs, tbl1CanonicalName);
+
         insertData(tbl, new String[]{"ID", "CATEGORY", "CAT_ID", "SUBCATEGORY", "SUBCAT_ID", "NAME"}, new Object[][]{
                 {1, "Photo", 1, "Camera Media", 11, "Media 1"},
                 {2, "Photo", 1, "Camera Media", 11, "Media 2"},
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItProjectScanMergeRuleTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItProjectScanMergeRuleTest.java
index 4ba202c1bb..8447f8ef2d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItProjectScanMergeRuleTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItProjectScanMergeRuleTest.java
@@ -62,12 +62,6 @@ public class ItProjectScanMergeRuleTest extends AbstractBasicIntegrationTest {
                         .changePartitions(10)
         );
 
-        CLUSTER_NODES.get(0).tables().alterTable(schTbl1.canonicalName(), tblCh ->
-                SchemaConfigurationConverter.addIndex(SchemaBuilders.sortedIndex(IDX_CAT_ID)
-                        .addIndexColumn("CAT_ID").done()
-                        .build(), tblCh)
-        );
-
         insertData(tbl, new String[]{"ID", "CATEGORY", "CAT_ID", "SUBCATEGORY", "SUBCAT_ID", "NAME"}, new Object[][]{
                 {1, "prod1", 1, "cat1", 11, "noname1"},
                 {2, "prod2", 2, "cat1", 11, "noname2"},
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index ebe4642251..e9ed4ecd07 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.Matchers.not;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.schema.definition.ColumnType;
@@ -71,20 +72,14 @@ public class ItSecondaryIndexTest extends AbstractBasicIntegrationTest {
                         .changePartitions(10)
         );
 
-        CLUSTER_NODES.get(0).tables().alterTable(tlbDef.canonicalName(), tblCh ->
-                List.of(SchemaBuilders.sortedIndex(DEPID_IDX).addIndexColumn("DEPID").done().build(),
-                        SchemaBuilders.sortedIndex(NAME_CITY_IDX)
-                                .addIndexColumn("DEPID").desc().done()
-                                .addIndexColumn("CITY").desc().done()
-                                .build(),
-                        SchemaBuilders.sortedIndex(NAME_DEPID_CITY_IDX)
-                                .addIndexColumn("NAME").done()
-                                .addIndexColumn("DEPID").desc().done()
-                                .addIndexColumn("CITY").desc().done()
-                                .build()
-                ).forEach(idxDef -> SchemaConfigurationConverter.addIndex(idxDef, tblCh))
+        Map<String, List<String>> idxs = Map.of(
+                DEPID_IDX, List.of("DEPID"),
+                NAME_CITY_IDX, List.of("DEPID desc", "CITY desc"),
+                NAME_DEPID_CITY_IDX, List.of("NAME", "DEPID desc", "CITY desc")
         );
 
+        addIndexes(CLUSTER_NODES.get(0), idxs, tlbDef.canonicalName());
+
         insertData(dev0, new String[]{"ID", "NAME", "DEPID", "CITY", "AGE"}, new Object[][]{
                 {1, "Mozart", 3, "Vienna", 33},
                 {2, "Beethoven", 2, "Vienna", 44},
@@ -127,13 +122,9 @@ public class ItSecondaryIndexTest extends AbstractBasicIntegrationTest {
                 SchemaConfigurationConverter.convert(schema1, tblCh)
                         .changeReplicas(2)
                         .changePartitions(10)
-        );
-
-        CLUSTER_NODES.get(0).tables().alterTable(tlbDef.canonicalName(), tblCh ->
-                SchemaConfigurationConverter.addIndex(SchemaBuilders.sortedIndex(PK_IDX)
-                        .addIndexColumn("F2").done()
-                        .addIndexColumn("F1").done()
-                        .build(), tblCh)
+                        .changePrimaryKey(primaryKeyChange -> {
+                            primaryKeyChange.changeColumns("F2", "F1");
+                        })
         );
 
         insertData(dev1, new String[]{"F1", "F2", "F3", "F4"}, new Object[][]{
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 27ba58c950..d00fe65cff 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -314,8 +314,10 @@ public class IgniteImpl implements Ignite {
                 ServiceLoader.load(DataStorageModule.class, serviceProviderClassLoader)
         );
 
+        final TablesConfiguration tablesConfiguration = clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
+
         dataStorageMgr = new DataStorageManager(
-                clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+                tablesConfiguration,
                 dataStorageModules.createStorageEngines(
                         name,
                         clusterCfgMgr.configurationRegistry(),
@@ -324,17 +326,14 @@ public class IgniteImpl implements Ignite {
                 )
         );
 
-        schemaManager = new SchemaManager(
-            registry,
-            clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
-        );
+        schemaManager = new SchemaManager(registry, tablesConfiguration);
 
         volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
 
         distributedTblMgr = new TableManager(
                 name,
                 registry,
-                clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
+                tablesConfiguration,
                 raftMgr,
                 baselineMgr,
                 clusterSvc.topologyService(),
@@ -345,15 +344,7 @@ public class IgniteImpl implements Ignite {
                 volatileLogStorageFactoryCreator
         );
 
-        indexManager = new IndexManager(
-                distributedTblMgr,
-                clusterCfgMgr.configurationRegistry()
-                        .getConfiguration(TablesConfiguration.KEY)
-                        .tables()
-                        .any()
-                        .indices()
-                        ::listenElements
-        );
+        indexManager = new IndexManager(tablesConfiguration);
 
         qryEngine = new SqlQueryProcessor(
                 registry,
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
index 7ef194183b..d2f8ae8228 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
@@ -152,4 +152,16 @@ public class SchemaUtils {
     public static String canonicalName(String schema, String name) {
         return schema + '.' + name;
     }
+
+    /**
+     * Extracts schema from canonical, made by {@link #canonicalName(String, String)}.
+     *
+     * @param canonicalName Canonical name.
+     * @return Schema.
+     */
+    public static String extractSchema(String canonicalName) {
+        int sepPos = canonicalName.indexOf('.');
+        assert sepPos != -1 : "No schema defined in " + canonicalName;
+        return canonicalName.substring(0, sepPos);
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/IndexValidatorImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/IndexValidatorImpl.java
new file mode 100644
index 0000000000..7f454ded41
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/IndexValidatorImpl.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.IndexValidator;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesView;
+import org.apache.ignite.configuration.validation.ValidationContext;
+import org.apache.ignite.configuration.validation.ValidationIssue;
+import org.apache.ignite.configuration.validation.Validator;
+import org.apache.ignite.internal.schema.definition.SchemaValidationUtils;
+import org.apache.ignite.internal.schema.definition.TableDefinitionImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.schema.definition.ColumnDefinition;
+import org.apache.ignite.schema.definition.index.IndexDefinition;
+
+/**
+ * Index configuration validator implementation.
+ */
+public class IndexValidatorImpl implements Validator<IndexValidator, NamedListView<TableIndexView>> {
+    /** Static instance. */
+    public static final IndexValidatorImpl INSTANCE = new IndexValidatorImpl();
+
+    @Override
+    public void validate(IndexValidator annotation, ValidationContext<NamedListView<TableIndexView>> ctx) {
+        TablesView tablesConfig = ctx.getNewRoot(TablesConfiguration.KEY);
+
+        assert tablesConfig != null;
+
+        NamedListView<? extends TableIndexView> indexView = tablesConfig.indexes();
+
+        NamedListView<? extends TableView> tablesView = tablesConfig.tables();
+
+        for (String key : indexView.namedListKeys()) {
+            TableIndexView idxView = indexView.get(key);
+
+            UUID tableId = idxView.tableId();
+
+            TableView tableView = getByInternalId(tablesView, tableId);
+
+            if (tableView == null) {
+                ctx.addIssue(new ValidationIssue(key, "Unable to create index [name=" + key + "]. Table not found."));
+            }
+
+            IndexDefinition index = SchemaConfigurationConverter.convert(idxView);
+
+            TableDefinitionImpl tbl = SchemaConfigurationConverter.convert(tableView);
+
+            Set<String> tableColumns = tbl.columns().stream().map(ColumnDefinition::name).collect(Collectors.toSet());
+
+            List<String> tableColocationColumns = tbl.colocationColumns();
+
+            try {
+                SchemaValidationUtils.validateIndexes(index, tableColumns, tableColocationColumns);
+            } catch (IllegalStateException e) {
+                ctx.addIssue(new ValidationIssue(key, e.getMessage()));
+            }
+
+            try {
+                SchemaValidationUtils.validateColumns(idxView, tableColumns);
+            } catch (IgniteInternalException e) {
+                ctx.addIssue(new ValidationIssue(key, e.getMessage()));
+            }
+        }
+    }
+
+    /** Private constructor. */
+    private IndexValidatorImpl() {
+    }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
index 31ac470546..fce92a95bf 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
@@ -402,12 +402,6 @@ public class SchemaConfigurationConverter {
      * @return TableChange to get result from.
      */
     public static TableChange convert(TableDefinition tbl, TableChange tblChg) {
-        tblChg.changeIndices(idxsChg -> {
-            for (IndexDefinition idx : tbl.indices()) {
-                idxsChg.create(idx.name(), idxInit -> convert(idx, idxInit));
-            }
-        });
-
         tblChg.changeColumns(colsChg -> {
             for (ColumnDefinition col : tbl.columns()) {
                 colsChg.create(col.name(), colChg -> convert(col, colChg));
@@ -456,25 +450,9 @@ public class SchemaConfigurationConverter {
             }
         }
 
-        NamedListView<? extends TableIndexView> idxsView = tblView.indices();
-
-        var indices = new HashMap<String, IndexDefinition>(capacity(idxsView.size()));
-
-        for (String key : idxsView.namedListKeys()) {
-            TableIndexView indexView = idxsView.get(key);
-
-            if (indexView == null) { // skip just deleted indices
-                continue;
-            }
-
-            IndexDefinition definition = convert(indexView);
-
-            indices.put(definition.name(), definition);
-        }
-
         PrimaryKeyDefinition primaryKey = convert(tblView.primaryKey());
 
-        return new TableDefinitionImpl(schemaName, tableName, columns, primaryKey, indices);
+        return new TableDefinitionImpl(schemaName, tableName, columns, primaryKey);
     }
 
     /**
@@ -503,22 +481,12 @@ public class SchemaConfigurationConverter {
      * Add index.
      *
      * @param idx       Index to add.
-     * @param tblChange TableChange to fulfill.
-     * @return TableChange to get result from.
-     */
-    public static TableChange addIndex(IndexDefinition idx, TableChange tblChange) {
-        return tblChange.changeIndices(idxsChg -> idxsChg.create(idx.name(), idxChg -> convert(idx, idxChg)));
-    }
-
-    /**
-     * Drop index.
-     *
-     * @param indexName Index name to drop.
-     * @param tblChange Table change to fulfill.
+     * @param tableId Table id.
+     * @param change Indexes change to fulfill.
      * @return TableChange to get result from.
      */
-    public static TableChange dropIndex(String indexName, TableChange tblChange) {
-        return tblChange.changeIndices(idxChg -> idxChg.delete(indexName));
+    public static TableIndexChange addIndex(IndexDefinition idx, UUID tableId, TableIndexChange change) {
+        return convert(idx, change).changeTableId(tableId);
     }
 
     /**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java
index 004461c139..68d42420f1 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.configuration.schemas.table.ColumnTypeValidator;
 import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.IndexValidator;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableValidator;
 import org.apache.ignite.configuration.validation.Validator;
@@ -46,7 +47,8 @@ public class SchemaDistributedConfigurationModule implements ConfigurationModule
     public Map<Class<? extends Annotation>, Set<Validator<? extends Annotation, ?>>> validators() {
         return Map.of(
                 TableValidator.class, Set.of(TableValidatorImpl.INSTANCE),
-                ColumnTypeValidator.class, Set.of(ColumnTypeValidatorImpl.INSTANCE)
+                ColumnTypeValidator.class, Set.of(ColumnTypeValidatorImpl.INSTANCE),
+                IndexValidator.class, Set.of(IndexValidatorImpl.INSTANCE)
         );
     }
 
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TableValidatorImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TableValidatorImpl.java
index 138d935a83..df5531b221 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TableValidatorImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TableValidatorImpl.java
@@ -23,7 +23,6 @@ import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.validation.ValidationContext;
 import org.apache.ignite.configuration.validation.ValidationIssue;
 import org.apache.ignite.configuration.validation.Validator;
-import org.apache.ignite.internal.schema.definition.SchemaValidationUtils;
 import org.apache.ignite.internal.schema.definition.TableDefinitionImpl;
 
 /**
@@ -46,8 +45,6 @@ public class TableValidatorImpl implements Validator<TableValidator, NamedListVi
 
                 assert !tbl.keyColumns().isEmpty();
                 assert !tbl.colocationColumns().isEmpty();
-
-                SchemaValidationUtils.validateIndices(tbl.indices(), tbl.columns(), tbl.colocationColumns());
             } catch (IllegalArgumentException e) {
                 ctx.addIssue(new ValidationIssue(
                         ctx.currentKey(),
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/SchemaValidationUtils.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/SchemaValidationUtils.java
index 366a2bb29f..e88329cb07 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/SchemaValidationUtils.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/SchemaValidationUtils.java
@@ -17,11 +17,18 @@
 
 package org.apache.ignite.internal.schema.definition;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+import org.apache.ignite.configuration.schemas.table.HashIndexView;
+import org.apache.ignite.configuration.schemas.table.SortedIndexView;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.lang.ErrorGroups;
+import org.apache.ignite.lang.ErrorGroups.Table;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.schema.definition.ColumnDefinition;
 import org.apache.ignite.schema.definition.index.ColumnarIndexDefinition;
 import org.apache.ignite.schema.definition.index.IndexColumnDefinition;
@@ -47,30 +54,62 @@ public class SchemaValidationUtils {
     }
 
     /**
-     * Validate indices.
+     * Validate indexes.
      *
-     * @param indices Table indices.
-     * @param cols Table columns.
+     * @param index Table index.
+     * @param colNames Table column names.
      * @param colocationColNames Colocation columns names.
      */
-    public static void validateIndices(
-            Collection<IndexDefinition> indices,
-            Collection<ColumnDefinition> cols,
+    public static void validateIndexes(
+            IndexDefinition index,
+            Collection<String> colNames,
             List<String> colocationColNames) {
-        Set<String> colNames = cols.stream().map(ColumnDefinition::name).collect(Collectors.toSet());
+        assert index instanceof ColumnarIndexDefinition : "Only columnar indices are supported.";
+        // Note: E.g. functional index is not columnar index as it index an expression result only.
 
-        for (IndexDefinition idx : indices) {
-            assert idx instanceof ColumnarIndexDefinition : "Only columnar indices are supported.";
-            // Note: E.g. functional index is not columnar index as it index an expression result only.
+        ColumnarIndexDefinition idx0 = (ColumnarIndexDefinition) index;
 
-            ColumnarIndexDefinition idx0 = (ColumnarIndexDefinition) idx;
+        if (!idx0.columns().stream().map(IndexColumnDefinition::name).allMatch(colNames::contains)) {
+            throw new IllegalStateException("Index column must exist in the schema.");
+        }
 
-            if (!idx0.columns().stream().map(IndexColumnDefinition::name).allMatch(colNames::contains)) {
-                throw new IllegalStateException("Index column must exist in the schema.");
-            }
+        if (idx0.unique() && !(idx0.columns().stream().map(IndexColumnDefinition::name).allMatch(colocationColNames::contains))) {
+            throw new IllegalStateException("Unique index must contains all colocation columns.");
+        }
+    }
+
+    /**
+     * Validate index enlisted columns.
+     *
+     * @param indexView Index info.
+     * @param tableColumns Table column names.
+     */
+    public static void validateColumns(TableIndexView indexView, Set<String> tableColumns) {
+        if (indexView instanceof SortedIndexView) {
+            var sortedIndexView = (SortedIndexView) indexView;
+
+            validateColumns(sortedIndexView.columns().namedListKeys(), tableColumns);
+        } else if (indexView instanceof HashIndexView) {
+            validateColumns(Arrays.asList(((HashIndexView) indexView).columnNames()), tableColumns);
+        } else {
+            throw new AssertionError("Unknown index type [type=" + (indexView != null ? indexView.getClass() : null) + ']');
+        }
+    }
+
+    private static void validateColumns(Iterable<String> indexedColumns, Set<String> tableColumns) {
+        if (CollectionUtils.nullOrEmpty(indexedColumns)) {
+            throw new IgniteInternalException(
+                    ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
+                    "At least one column should be specified by index definition"
+            );
+        }
 
-            if (idx0.unique() && !(idx0.columns().stream().map(IndexColumnDefinition::name).allMatch(colocationColNames::contains))) {
-                throw new IllegalStateException("Unique index must contains all colocation columns.");
+        for (var columnName : indexedColumns) {
+            if (!tableColumns.contains(columnName)) {
+                throw new IgniteInternalException(
+                        Table.COLUMN_NOT_FOUND_ERR,
+                        "Column not found [name=" + columnName + ']'
+                );
             }
         }
     }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/TableDefinitionImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/TableDefinitionImpl.java
index f3083105d0..8e8b859915 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/TableDefinitionImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/TableDefinitionImpl.java
@@ -18,18 +18,14 @@
 package org.apache.ignite.internal.schema.definition;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.internal.schema.SchemaUtils;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.schema.definition.ColumnDefinition;
 import org.apache.ignite.schema.definition.PrimaryKeyDefinition;
 import org.apache.ignite.schema.definition.TableDefinition;
-import org.apache.ignite.schema.definition.index.IndexDefinition;
 
 /**
  * Table.
@@ -42,9 +38,6 @@ public class TableDefinitionImpl extends AbstractSchemaObject implements TableDe
     /** Key columns. */
     private final LinkedHashMap<String, ColumnDefinition> colMap;
 
-    /** Indices. */
-    private final Map<String, IndexDefinition> indices;
-
     /** Cached key columns. */
     private final Set<String> keyCols;
 
@@ -58,20 +51,17 @@ public class TableDefinitionImpl extends AbstractSchemaObject implements TableDe
      * @param tableName            Table name.
      * @param colMap               Columns.
      * @param primaryKeyDefinition Primary key.
-     * @param indices              Indices.
      */
     public TableDefinitionImpl(
             String schemaName,
             String tableName,
             LinkedHashMap<String, ColumnDefinition> colMap,
-            PrimaryKeyDefinition primaryKeyDefinition,
-            Map<String, IndexDefinition> indices
+            PrimaryKeyDefinition primaryKeyDefinition
     ) {
         super(tableName);
 
         this.schemaName = schemaName;
         this.colMap = colMap;
-        this.indices = indices;
 
         keyCols = primaryKeyDefinition.columns();
         colocationCols = primaryKeyDefinition.colocationColumns();
@@ -107,12 +97,6 @@ public class TableDefinitionImpl extends AbstractSchemaObject implements TableDe
         return SchemaUtils.canonicalName(schemaName, name());
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public Collection<IndexDefinition> indices() {
-        return Collections.unmodifiableCollection(indices.values());
-    }
-
     /**
      * Check if specified column already exists.
      *
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaConfigurationTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaConfigurationTest.java
index 8b077b2a9a..64a9748f34 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaConfigurationTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaConfigurationTest.java
@@ -21,8 +21,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.List;
 import java.util.Map;
+import org.apache.ignite.internal.schema.testutils.builder.HashIndexDefinitionBuilder;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
+import org.apache.ignite.internal.schema.testutils.builder.SortedIndexDefinitionBuilder;
 import org.apache.ignite.internal.schema.testutils.builder.TableDefinitionBuilder;
 import org.apache.ignite.schema.definition.ColumnType;
 import org.apache.ignite.schema.definition.SchemaObject;
@@ -57,24 +60,21 @@ public class SchemaConfigurationTest {
                                 .withColocationColumns(
                                         "affId") // Optional colocation declaration. If not set, all PK columns will be colocation cols.
                                 .build()
-                )
+                ).build();
 
-                // 'withIndex' single entry point allows extended index support.
-                // E.g. we may want to support Geo-index later with some plugin.
-                .withIndex(
-                        SchemaBuilders.sortedIndex("idx_1_sorted")
-                                .addIndexColumn("id").desc().done()
-                                .addIndexColumn("name").asc().done()
-                                .withHints(Map.of("INLINE_SIZE", "42", "INLINE_STRATEGY", "INLINE_HASH")) // In-line key-hash as well.
-                                .build()
-                )
+        SortedIndexDefinitionBuilder idxBuilderSorted = SchemaBuilders.sortedIndex("idx_1_sorted");
 
-                .withIndex(
-                        SchemaBuilders.hashIndex("idx_3_hash")
-                                .withColumns("id", "affId")
-                                .build()
-                )
+        idxBuilderSorted
+                .addIndexColumn("id").desc().done()
+                .addIndexColumn("name").asc().done()
+                .withHints(Map.of("INLINE_SIZE", "42", "INLINE_STRATEGY", "INLINE_HASH"))
+                .build();
+
+        HashIndexDefinitionBuilder idxBuilderHash = SchemaBuilders.hashIndex("idx_1_hash");
 
+        idxBuilderHash
+                .withColumns(List.of("col1", "col2"))
+                .withHints(Map.of("INLINE_SIZE", "42", "INLINE_STRATEGY", "INLINE_HASH"))
                 .build();
     }
 
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractTableIndexValidatorTest.java
similarity index 66%
copy from modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
copy to modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractTableIndexValidatorTest.java
index 2cddd5d378..b82510f4a8 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractTableIndexValidatorTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.ignite.internal.schema.configuration;
 
-import static org.apache.ignite.internal.configuration.validation.TestValidationUtil.mockValidationContext;
-import static org.apache.ignite.internal.configuration.validation.TestValidationUtil.validate;
-import static org.mockito.Mockito.mock;
-
-import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
@@ -29,31 +24,25 @@ import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigur
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.TableValidator;
-import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
-import org.apache.ignite.configuration.validation.ValidationContext;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.schema.configuration.schema.TestDataStorageConfigurationSchema;
-import org.junit.jupiter.api.Test;
+import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
+import org.apache.ignite.internal.schema.testutils.builder.TableDefinitionBuilder;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
- * TableValidatorImplTest.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Base class for table and index validation tests.
  */
 @ExtendWith(ConfigurationExtension.class)
-public class TableValidatorImplTest {
+public class AbstractTableIndexValidatorTest {
     /** Basic table configuration to mutate and then validate. */
     @InjectConfiguration(
-            value = "mock.tables = [{\n"
-                    + "    name = schema.table,\n"
-                    + "    columns.id {type.type = STRING, nullable = true},\n"
-                    + "    primaryKey {columns = [id], colocationColumns = [id]},\n"
-                    + "    indices.foo {type = HASH, columnNames = [id]}\n"
-                    + "}]",
             polymorphicExtensions = {
                     HashIndexConfigurationSchema.class,
                     SortedIndexConfigurationSchema.class,
@@ -66,13 +55,26 @@ public class TableValidatorImplTest {
                     EntryCountBudgetConfigurationSchema.class
             }
     )
-    private TablesConfiguration tablesCfg;
+    protected TablesConfiguration tablesCfg;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        final TableDefinitionBuilder builder = SchemaBuilders.tableBuilder("schema", "table");
+
+        TableDefinition def = builder
+                .columns(
+                        SchemaBuilders.column("id", ColumnType.INT32).build(),
+                        SchemaBuilders.column("affId", ColumnType.INT32).build(),
+                        SchemaBuilders.column("id2", ColumnType.string()).asNullable(true).build()
+                )
 
-    /** Tests that validator finds no issues in a simple valid configuration. */
-    @Test
-    public void testNoIssues() {
-        ValidationContext<NamedListView<TableView>> ctx = mockValidationContext(null, tablesCfg.tables().value());
+                .withPrimaryKey(
+                        SchemaBuilders.primaryKey()  // Declare index column in order.
+                                .withColumns("affId", "id")
+                                .withColocationColumns("affId")
+                                .build()
+                ).build();
 
-        validate(TableValidatorImpl.INSTANCE, mock(TableValidator.class), ctx, null);
+        tablesCfg.tables().change(c -> c.create("schema.table", tblChg -> SchemaConfigurationConverter.convert(def, tblChg))).get();
     }
 }
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/IndexValidatorImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/IndexValidatorImplTest.java
new file mode 100644
index 0000000000..a7a5483171
--- /dev/null
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/IndexValidatorImplTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import static org.apache.ignite.internal.configuration.validation.TestValidationUtil.validate;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.table.HashIndexChange;
+import org.apache.ignite.configuration.schemas.table.IndexValidator;
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesView;
+import org.apache.ignite.configuration.validation.ValidationContext;
+import org.apache.ignite.internal.configuration.NamedListConfiguration;
+import org.junit.jupiter.api.Test;
+
+/** Index validator tests. */
+public class IndexValidatorImplTest extends AbstractTableIndexValidatorTest {
+    /** Tests that validator finds no issues in a simple valid configuration. */
+    @Test
+    public void testNoIssues() throws Exception {
+        ValidationContext<NamedListView<TableIndexView>> ctxIdx = mock(ValidationContext.class);
+
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> cfg0 = tablesCfg.tables();
+
+        List<UUID> ids = ((NamedListConfiguration<TableConfiguration, ?, ?>) cfg0).internalIds();
+
+        assertEquals(1, ids.size());
+
+        UUID tableId = ids.get(0);
+
+        tablesCfg.indexes().change(c -> c.create("schema.idx", idxCng -> idxCng.convert(HashIndexChange.class)
+                .changeColumnNames("ID2").changeTableId(tableId))).get();
+
+        TablesView view = tablesCfg.value();
+
+        when(ctxIdx.getNewRoot(any())).thenReturn(view);
+
+        validate(IndexValidatorImpl.INSTANCE, mock(IndexValidator.class), ctxIdx, null);
+    }
+}
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
index 8114a62bc6..45ecc1bbba 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
@@ -27,25 +27,36 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfiguration;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.IndexValidator;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexView;
+import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexChange;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
 import org.apache.ignite.configuration.schemas.table.TableValidator;
+import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.configuration.NamedListConfiguration;
 import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import org.apache.ignite.internal.schema.definition.ColumnDefinitionImpl;
 import org.apache.ignite.internal.schema.testutils.builder.HashIndexDefinitionBuilder;
@@ -60,8 +71,8 @@ import org.apache.ignite.schema.definition.DefaultValueGenerators;
 import org.apache.ignite.schema.definition.TableDefinition;
 import org.apache.ignite.schema.definition.index.HashIndexDefinition;
 import org.apache.ignite.schema.definition.index.IndexColumnDefinition;
-import org.apache.ignite.schema.definition.index.IndexDefinition;
 import org.apache.ignite.schema.definition.index.SortOrder;
+import org.apache.ignite.schema.definition.index.SortedIndexColumnDefinition;
 import org.apache.ignite.schema.definition.index.SortedIndexDefinition;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -81,6 +92,9 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
     /** Configuration registry with one table for each test. */
     private ConfigurationRegistry confRegistry;
 
+    /** Registered table id. */
+    private UUID tableId;
+
     /**
      * Prepare configuration registry for test.
      *
@@ -88,15 +102,17 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
      * @throws InterruptedException If failed.
      */
     @BeforeEach
-    public void createRegistry() throws ExecutionException, InterruptedException {
+    public void createRegistry() throws Exception {
         confRegistry = new ConfigurationRegistry(
                 List.of(TablesConfiguration.KEY),
-                Map.of(TableValidator.class, Set.of(TableValidatorImpl.INSTANCE)),
+                Map.of(TableValidator.class, Set.of(TableValidatorImpl.INSTANCE),
+                        IndexValidator.class, Set.of(IndexValidatorImpl.INSTANCE)),
                 new TestConfigurationStorage(DISTRIBUTED),
                 List.of(),
                 List.of(
                         HashIndexConfigurationSchema.class,
                         SortedIndexConfigurationSchema.class,
+
                         UnknownDataStorageConfigurationSchema.class,
                         ConstantValueDefaultConfigurationSchema.class,
                         FunctionCallDefaultConfigurationSchema.class,
@@ -125,6 +141,14 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
                                 tblsCh -> tblsCh.update(tbl.canonicalName(), tblCh -> tblCh.changeReplicas(1).changeTableId(1))
                         )
         ).get();
+
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> cfg0 = getConfiguration().tables();
+
+        List<UUID> ids = ((NamedListConfiguration<TableConfiguration, ?, ?>) cfg0).internalIds();
+
+        assertEquals(1, ids.size());
+
+        tableId = ids.get(0);
     }
 
     @AfterEach
@@ -142,15 +166,16 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
                 .withHints(Collections.singletonMap("param", "value"));
         HashIndexDefinition idx = builder.build();
 
-        getTbl().change(ch -> SchemaConfigurationConverter.addIndex(idx, ch)).get();
+        getConfiguration().indexes().change(change -> change.create(idx.name(), ch ->
+                SchemaConfigurationConverter.addIndex(idx, tableId, ch))).get();
 
-        TableDefinition tbl = SchemaConfigurationConverter.convert(getTbl().value());
+        HashIndexConfiguration idxCfg = (HashIndexConfiguration) getConfiguration().indexes().get(idx.name());
 
-        HashIndexDefinition idx2 = (HashIndexDefinition) getIdx(idx.name(), tbl.indices());
+        assertNotNull(idxCfg);
 
-        assertNotNull(idx2);
-        assertEquals("HASH", idx2.type());
-        assertEquals(3, idx2.columns().size());
+        assertEquals("HASH", idxCfg.type().value());
+        assertEquals(3, idxCfg.columnNames().value().length);
+        assertEquals(idxCfg.tableId().value(), tableId);
     }
 
     /**
@@ -165,19 +190,22 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
 
         SortedIndexDefinition idx = builder.build();
 
-        getTbl().change(ch -> SchemaConfigurationConverter.addIndex(idx, ch)).get();
+        NamedConfigurationTree<TableIndexConfiguration, TableIndexView, TableIndexChange> idxConfig = getConfiguration().indexes();
 
-        TableDefinition tbl = SchemaConfigurationConverter.convert(getTbl().value());
+        idxConfig.change(change -> change.create(idx.name(), ch ->
+                SchemaConfigurationConverter.addIndex(idx, tableId, ch))).get();;
 
-        SortedIndexDefinition idx2 = (SortedIndexDefinition) getIdx(idx.name(), tbl.indices());
+        SortedIndexView idx2 = (SortedIndexView) idxConfig.get(idx.name()).value();
 
         assertNotNull(idx2);
         assertEquals("SORTED", idx2.type());
         assertEquals(2, idx2.columns().size());
         assertEquals("A", idx2.columns().get(0).name());
         assertEquals("B", idx2.columns().get(1).name());
-        assertEquals(SortOrder.ASC, idx2.columns().get(0).sortOrder());
-        assertEquals(SortOrder.DESC, idx2.columns().get(1).sortOrder());
+        SortedIndexColumnDefinition col0 = SchemaConfigurationConverter.convert(idx2.columns().get(0));
+        SortedIndexColumnDefinition col1 = SchemaConfigurationConverter.convert(idx2.columns().get(1));
+        assertEquals(SortOrder.ASC, col0.sortOrder());
+        assertEquals(SortOrder.DESC, col1.sortOrder());
     }
 
     /**
@@ -190,18 +218,17 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
                 .unique(true)
                 .build();
 
-        getTbl().change(ch -> SchemaConfigurationConverter.addIndex(idx, ch)).get();
+        getConfiguration().indexes().change(change -> change.create(idx.name(), ch ->
+                SchemaConfigurationConverter.addIndex(idx, tableId, ch))).get();
 
-        TableDefinition tbl = SchemaConfigurationConverter.convert(getTbl().value());
-
-        SortedIndexDefinition idx2 = (SortedIndexDefinition) getIdx(idx.name(), tbl.indices());
+        SortedIndexView idx2 = (SortedIndexView) getConfiguration().indexes().get(idx.name()).value();
 
         assertNotNull(idx2);
         assertEquals("PK_SORTED", idx2.name());
         assertEquals("SORTED", idx2.type());
         assertEquals(idx.columns().stream().map(IndexColumnDefinition::name).collect(Collectors.toList()),
-                idx2.columns().stream().map(IndexColumnDefinition::name).collect(Collectors.toList()));
-        assertTrue(idx2.unique());
+                new ArrayList<>(idx2.columns().namedListKeys()));
+        assertTrue(idx2.uniq());
     }
 
     /**
@@ -215,23 +242,24 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
                 .addIndexColumn("COL1").desc().done()
                 .build();
 
-        getTbl().change(ch -> SchemaConfigurationConverter.addIndex(idx, ch)).get();
-
-        TableDefinition tbl = SchemaConfigurationConverter.convert(getTbl().value());
+        getConfiguration().indexes().change(change -> change.create(idx.canonicalName(), ch ->
+                SchemaConfigurationConverter.addIndex(idx, tableId, ch))).get();
 
-        SortedIndexDefinition idx2 = (SortedIndexDefinition) getIdx(idx.name(), tbl.indices());
+        SortedIndexView idx2 = (SortedIndexView) getConfiguration().indexes().get(idx.name()).value();
 
         assertNotNull(idx2);
-        assertEquals("uniq_sorted", idx2.name());
+        assertEquals("uniq_sorted", idx2.name().toLowerCase(Locale.US));
         assertEquals("SORTED", idx2.type());
 
-        assertTrue(idx2.unique());
+        assertTrue(idx2.uniq());
 
         assertEquals(2, idx2.columns().size());
         assertEquals("A", idx2.columns().get(0).name());
         assertEquals("COL1", idx2.columns().get(1).name());
-        assertEquals(SortOrder.ASC, idx2.columns().get(0).sortOrder());
-        assertEquals(SortOrder.DESC, idx2.columns().get(1).sortOrder());
+        SortedIndexColumnDefinition col0 = SchemaConfigurationConverter.convert(idx2.columns().get(0));
+        SortedIndexColumnDefinition col1 = SchemaConfigurationConverter.convert(idx2.columns().get(1));
+        assertEquals(SortOrder.ASC, col0.sortOrder());
+        assertEquals(SortOrder.DESC, col1.sortOrder());
     }
 
     /**
@@ -246,7 +274,6 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
         TableDefinition tbl2 = SchemaConfigurationConverter.convert(tblCfg);
 
         assertEquals(tbl.canonicalName(), tbl2.canonicalName());
-        assertEquals(tbl.indices().size(), tbl2.indices().size());
         assertEquals(tbl.keyColumns().size(), tbl2.keyColumns().size());
         assertEquals(tbl.colocationColumns().size(), tbl2.colocationColumns().size());
         assertEquals(tbl.columns().size(), tbl2.columns().size());
@@ -338,23 +365,12 @@ public class SchemaConfigurationConverterTest extends AbstractSchemaConverterTes
     }
 
     /**
-     * Get tests default table configuration.
-     *
-     * @return Configuration of default table.
-     */
-    private TableConfiguration getTbl() {
-        return confRegistry.getConfiguration(TablesConfiguration.KEY).tables().get(tblBuilder.build().canonicalName());
-    }
-
-    /**
-     * Get table index by name.
+     * Get tests default index configuration.
      *
-     * @param name Index name to find.
-     * @param idxs Table indexes.
-     * @return Index or {@code null} if there are no index with such name.
+     * @return Indexes configuration.
      */
-    private IndexDefinition getIdx(String name, Collection<IndexDefinition> idxs) {
-        return idxs.stream().filter(idx -> name.equals(idx.name())).findAny().orElse(null);
+    private TablesConfiguration getConfiguration() {
+        return confRegistry.getConfiguration(TablesConfiguration.KEY);
     }
 
     private static Iterable<DefaultValueArg> generateTestArguments() {
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
index 2cddd5d378..6ec53ae88d 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
@@ -22,52 +22,16 @@ import static org.apache.ignite.internal.configuration.validation.TestValidation
 import static org.mockito.Mockito.mock;
 
 import org.apache.ignite.configuration.NamedListView;
-import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
-import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableValidator;
 import org.apache.ignite.configuration.schemas.table.TableView;
-import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
-import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.configuration.validation.ValidationContext;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.schema.configuration.schema.TestDataStorageConfigurationSchema;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * TableValidatorImplTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-@ExtendWith(ConfigurationExtension.class)
-public class TableValidatorImplTest {
-    /** Basic table configuration to mutate and then validate. */
-    @InjectConfiguration(
-            value = "mock.tables = [{\n"
-                    + "    name = schema.table,\n"
-                    + "    columns.id {type.type = STRING, nullable = true},\n"
-                    + "    primaryKey {columns = [id], colocationColumns = [id]},\n"
-                    + "    indices.foo {type = HASH, columnNames = [id]}\n"
-                    + "}]",
-            polymorphicExtensions = {
-                    HashIndexConfigurationSchema.class,
-                    SortedIndexConfigurationSchema.class,
-                    UnknownDataStorageConfigurationSchema.class,
-                    TestDataStorageConfigurationSchema.class,
-                    ConstantValueDefaultConfigurationSchema.class,
-                    FunctionCallDefaultConfigurationSchema.class,
-                    NullValueDefaultConfigurationSchema.class,
-                    UnlimitedBudgetConfigurationSchema.class,
-                    EntryCountBudgetConfigurationSchema.class
-            }
-    )
-    private TablesConfiguration tablesCfg;
-
+public class TableValidatorImplTest extends AbstractTableIndexValidatorTest {
     /** Tests that validator finds no issues in a simple valid configuration. */
     @Test
     public void testNoIssues() {
diff --git a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilder.java b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilder.java
index 8c91502ba6..30cb134ac3 100644
--- a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilder.java
+++ b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilder.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import org.apache.ignite.schema.definition.ColumnDefinition;
 import org.apache.ignite.schema.definition.PrimaryKeyDefinition;
 import org.apache.ignite.schema.definition.TableDefinition;
-import org.apache.ignite.schema.definition.index.IndexDefinition;
 
 /**
  * Table definition builder.
@@ -44,14 +43,6 @@ public interface TableDefinitionBuilder extends SchemaObjectBuilder {
      */
     TableDefinitionBuilder columns(ColumnDefinition... columns);
 
-    /**
-     * Adds an index.
-     *
-     * @param indexDefinition Table index definition.
-     * @return {@code This} for chaining.
-     */
-    TableDefinitionBuilder withIndex(IndexDefinition indexDefinition);
-
     /**
      * Shortcut method for adding {@link PrimaryKeyDefinition} of single column.
      *
diff --git a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilderImpl.java b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilderImpl.java
index 6ce59fc727..9ed3dcbb86 100644
--- a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilderImpl.java
+++ b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/builder/TableDefinitionBuilderImpl.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.internal.schema.testutils.builder;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +27,6 @@ import org.apache.ignite.internal.util.IgniteObjectName;
 import org.apache.ignite.schema.definition.ColumnDefinition;
 import org.apache.ignite.schema.definition.PrimaryKeyDefinition;
 import org.apache.ignite.schema.definition.TableDefinition;
-import org.apache.ignite.schema.definition.index.IndexDefinition;
 
 /**
  * Table builder.
@@ -44,9 +41,6 @@ class TableDefinitionBuilderImpl implements TableDefinitionBuilder {
     /** Columns definitions. */
     private final LinkedHashMap<String, ColumnDefinition> columns = new LinkedHashMap<>();
 
-    /** Indices definitions. */
-    private final Map<String, IndexDefinition> indices = new HashMap<>();
-
     /** Table primary key. */
     private PrimaryKeyDefinition primaryKeyDefinition;
 
@@ -79,16 +73,6 @@ class TableDefinitionBuilderImpl implements TableDefinitionBuilder {
         return columns(Arrays.asList(columns));
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public TableDefinitionBuilder withIndex(IndexDefinition indexDefinition) {
-        if (indices.put(indexDefinition.name(), indexDefinition) != null) {
-            throw new IllegalArgumentException("Index with same name already exists: " + indexDefinition.name());
-        }
-
-        return this;
-    }
-
     /** {@inheritDoc} */
     @Override
     public TableDefinitionBuilder withPrimaryKey(String colName) {
@@ -121,14 +105,12 @@ class TableDefinitionBuilderImpl implements TableDefinitionBuilder {
         assert columns.size() > primaryKeyDefinition.columns().size() : "Key or/and value columns must be defined.";
 
         SchemaValidationUtils.validatePrimaryKey(primaryKeyDefinition.columns(), columns);
-        SchemaValidationUtils.validateIndices(indices.values(), columns.values(), primaryKeyDefinition.colocationColumns());
 
         return new TableDefinitionImpl(
                 schemaName,
                 tableName,
                 columns,
-                primaryKeyDefinition,
-                Collections.unmodifiableMap(indices)
+                primaryKeyDefinition
         );
     }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 7db2133705..1b6bc83fc9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -526,6 +526,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableCreated(
+                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
                     "PUBLIC",
                     parameters.table(),
                     parameters.causalityToken()
@@ -545,6 +546,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableUpdated(
+                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
                     "PUBLIC",
                     parameters.table(),
                     parameters.causalityToken()
@@ -564,6 +566,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onTableDropped(
+                    // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
                     "PUBLIC",
                     parameters.tableName(),
                     parameters.causalityToken()
@@ -581,6 +584,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onIndexDropped(
+                            // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas
                             "PUBLIC",
                             parameters.indexId(),
                             parameters.causalityToken()
@@ -598,7 +602,6 @@ public class SqlQueryProcessor implements QueryProcessor {
         @Override
         public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
             return schemaHolder.onIndexCreated(
-                            "PUBLIC",
                             parameters.index(),
                             parameters.causalityToken()
                     )
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 829dba2404..60c44c413f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -37,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -264,6 +265,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
             e = e.getCause();
         }
 
+        if (e instanceof ConfigurationChangeException) {
+            assert e.getCause() != null;
+            // Cut off upper configuration error`s as uninformative.
+            e = e.getCause();
+        }
+
         if (e instanceof IgniteInternalCheckedException) {
             return new IgniteInternalException("Failed to execute DDL statement [stmt=" /*+ qry.sql()*/
                     + ", err=" + e.getMessage() + ']', e);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 76395fe133..5ff26fa0ec 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -250,8 +250,7 @@ public class DdlCommandHandler {
                         cmd.indexName(),
                         cmd.tableName(),
                         !cmd.ifNotExists(),
-                        indexChanger)
-                .thenApply(Objects::nonNull);
+                        indexChanger);
     }
 
     /** Handles drop index command. */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 928f481e86..398b545685 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.schema;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.schema.SchemaUtils.extractSchema;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
@@ -383,12 +384,11 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
     /**
      * Index created callback method register index in Calcite schema.
      *
-     * @param schemaName Schema name.
      * @param index Index instance.
      * @param causalityToken Causality token.
      * @return Schema registration future.
      */
-    public synchronized CompletableFuture<?> onIndexCreated(String schemaName, Index<?> index, long causalityToken) {
+    public synchronized CompletableFuture<?> onIndexCreated(Index<?> index, long causalityToken) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
         }
@@ -398,6 +398,8 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
                     return failedFuture(e);
                 }
 
+                String schemaName = extractSchema(index.name());
+
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                 IgniteSchema schema = res.compute(schemaName,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index b0a2ed5434..6ab73fc3a0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -270,7 +270,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         tblManager = mockManagers();
 
-        idxManager = new IndexManager(tblManager, (lsnr) -> {});
+        idxManager = new IndexManager(tblsCfg);
 
         idxManager.start();
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index 97182cd42b..2f54bd1d65 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -267,7 +267,7 @@ public class SqlSchemaManagerTest {
     }
 
     @Test
-    public void testIndexEventHandler() {
+    public void testIndexEventHandler() throws Exception {
         InternalTable mock = mock(InternalTable.class);
         when(mock.tableId()).thenReturn(tableId);
 
@@ -285,12 +285,12 @@ public class SqlSchemaManagerTest {
         IndexDescriptor descMock = mock(IndexDescriptor.class);
         when(descMock.columns()).thenReturn(List.of());
 
-        when(index.name()).thenReturn("I");
+        when(index.name()).thenReturn("TEST_SCHEMA.I");
         when(index.id()).thenReturn(indexId);
         when(index.tableId()).thenReturn(tableId);
         when(index.descriptor()).thenReturn(descMock);
 
-        sqlSchemaManager.onIndexCreated("TEST_SCHEMA", index, testRevisionRegister.actualToken() + 1);
+        sqlSchemaManager.onIndexCreated(index, testRevisionRegister.actualToken() + 1);
         testRevisionRegister.moveForward();
 
         IgniteSchema schema = sqlSchemaManager.schema("TEST_SCHEMA").unwrap(IgniteSchema.class);
@@ -302,7 +302,7 @@ public class SqlSchemaManagerTest {
         IgniteTableImpl igniteTable = assertInstanceOf(IgniteTableImpl.class, schemaTable);
 
         assertEquals(igniteTable.id(), igniteIndex.index().tableId());
-        assertSame(igniteIndex, igniteTable.indexes().get("I"));
+        assertSame(igniteIndex, igniteTable.indexes().get("TEST_SCHEMA.I"));
 
         sqlSchemaManager.onIndexDropped("TEST_SCHEMA", indexId, testRevisionRegister.actualToken() + 1);
         testRevisionRegister.moveForward();
@@ -314,7 +314,7 @@ public class SqlSchemaManagerTest {
 
 
     @Test
-    public void testIndexEventsProcessed() {
+    public void testIndexEventsProcessed() throws Exception {
         InternalTable mock = mock(InternalTable.class);
         when(mock.tableId()).thenReturn(tableId);
 
@@ -330,7 +330,9 @@ public class SqlSchemaManagerTest {
         IndexDescriptor descMock = mock(IndexDescriptor.class);
         when(descMock.columns()).thenReturn(List.of());
 
-        when(index.name()).thenReturn("I");
+        String idxName = "TEST_SCHEMA.I";
+
+        when(index.name()).thenReturn(idxName);
         when(index.id()).thenReturn(indexId);
         when(index.tableId()).thenReturn(tableId);
         when(index.descriptor()).thenReturn(descMock);
@@ -338,7 +340,7 @@ public class SqlSchemaManagerTest {
         {
             SchemaPlus schema1 = sqlSchemaManager.schema("TEST_SCHEMA");
 
-            sqlSchemaManager.onIndexCreated("TEST_SCHEMA", index, testRevisionRegister.actualToken() + 1);
+            sqlSchemaManager.onIndexCreated(index, testRevisionRegister.actualToken() + 1);
             testRevisionRegister.moveForward();
 
             SchemaPlus schema2 = sqlSchemaManager.schema("TEST_SCHEMA");
@@ -350,8 +352,8 @@ public class SqlSchemaManagerTest {
             assertNull(schema1.unwrap(IgniteSchema.class).index(indexId));
             assertNotNull(schema2.unwrap(IgniteSchema.class).index(indexId));
 
-            assertNull(((InternalIgniteTable) schema1.getTable("T")).getIndex("I"));
-            assertNotNull(((InternalIgniteTable) schema2.getTable("T")).getIndex("I"));
+            assertNull(((InternalIgniteTable) schema1.getTable("T")).getIndex(idxName));
+            assertNotNull(((InternalIgniteTable) schema2.getTable("T")).getIndex(idxName));
         }
         {
             sqlSchemaManager.onIndexDropped("TEST_SCHEMA", indexId, testRevisionRegister.actualToken() + 1);
@@ -367,8 +369,8 @@ public class SqlSchemaManagerTest {
             assertNotNull(schema1.unwrap(IgniteSchema.class).index(indexId));
             assertNull(schema2.unwrap(IgniteSchema.class).index(indexId));
 
-            assertNull(((InternalIgniteTable) schema2.getTable("T")).getIndex("I"));
-            assertNotNull(((InternalIgniteTable) schema1.getTable("T")).getIndex("I"));
+            assertNull(((InternalIgniteTable) schema2.getTable("T")).getIndex(idxName));
+            assertNotNull(((InternalIgniteTable) schema1.getTable("T")).getIndex(idxName));
         }
 
         verifyNoMoreInteractions(tableManager);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index ec8b653de6..1693cd86f1 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.storage.engine;
 
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.storage.StorageException;
 
 /**
@@ -55,7 +56,8 @@ public interface StorageEngine {
      * Creates new table storage.
      *
      * @param tableCfg Table configuration.
+     * @param tablesCfg Tables configuration.
      * @throws StorageException If an error has occurs while creating the table.
      */
-    MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException;
+    MvTableStorage createMvTable(TableConfiguration tableCfg, TablesConfiguration tablesCfg) throws StorageException;
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
index 17e2acf664..b547a30543 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.schemas.table.ColumnView;
 import org.apache.ignite.configuration.schemas.table.HashIndexView;
 import org.apache.ignite.configuration.schemas.table.TableIndexView;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesView;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
@@ -89,11 +90,14 @@ public class HashIndexDescriptor {
     /**
      * Creates an Index Descriptor from a given Table Configuration.
      *
-     * @param indexId Index ID.
      * @param tableConfig Table configuration.
+     * @param tablesConfig Tables and indexes configuration.
+     * @param indexId Index id.
+     *
      */
-    public HashIndexDescriptor(UUID indexId, TableView tableConfig) {
-        TableIndexView indexConfig = ConfigurationUtil.getByInternalId(tableConfig.indices(), indexId);
+    // TODO: IGNITE-17727 Fix redundant param.
+    public HashIndexDescriptor(UUID indexId, TableView tableConfig, TablesView tablesConfig) {
+        TableIndexView indexConfig = ConfigurationUtil.getByInternalId(tablesConfig.indexes(), indexId);
 
         if (indexConfig == null) {
             throw new StorageException(String.format("Index configuration for \"%s\" could not be found", indexId));
@@ -102,11 +106,11 @@ public class HashIndexDescriptor {
         if (!(indexConfig instanceof HashIndexView)) {
             throw new StorageException(String.format(
                     "Index \"%s\" is not configured as a Hash Index. Actual type: %s",
-                    indexId, indexConfig.type()
+                    indexConfig.id(), indexConfig.type()
             ));
         }
 
-        this.id = indexConfig.id();
+        this.id = indexId;
 
         String[] indexColumns = ((HashIndexView) indexConfig).columnNames();
 
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
index 0c5c94bc3e..59a1c220e9 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.configuration.schemas.table.IndexColumnView;
 import org.apache.ignite.configuration.schemas.table.SortedIndexView;
 import org.apache.ignite.configuration.schemas.table.TableIndexView;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesView;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
@@ -103,10 +104,9 @@ public class SortedIndexDescriptor {
      * @param indexId index ID.
      * @param tableConfig table configuration.
      */
-    public SortedIndexDescriptor(UUID indexId, TableView tableConfig) {
-        this.id = indexId;
-
-        TableIndexView indexConfig = ConfigurationUtil.getByInternalId(tableConfig.indices(), indexId);
+    // TODO: IGNITE-17727 Fix redundant param.
+    public SortedIndexDescriptor(UUID indexId, TableView tableConfig, TablesView tablesConfig) {
+        TableIndexView indexConfig = ConfigurationUtil.getByInternalId(tablesConfig.indexes(), indexId);
 
         if (indexConfig == null) {
             throw new StorageException(String.format("Index configuration for \"%s\" could not be found", indexId));
@@ -119,6 +119,8 @@ public class SortedIndexDescriptor {
             ));
         }
 
+        this.id = indexId;
+
         NamedListView<? extends IndexColumnView> indexColumns = ((SortedIndexView) indexConfig).columns();
 
         columns = indexColumns.namedListKeys().stream()
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ConcurrentHashMapMvTableStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ConcurrentHashMapMvTableStorageTest.java
index 06e581b392..459d4cff90 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ConcurrentHashMapMvTableStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/ConcurrentHashMapMvTableStorageTest.java
@@ -21,6 +21,8 @@ import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchem
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -45,10 +47,15 @@ public class ConcurrentHashMapMvTableStorageTest extends AbstractMvTableStorageT
             },
             value = "mock.dataStorage.name = " + TestConcurrentHashMapStorageEngine.ENGINE_NAME
     )
-    private TableConfiguration tableConfig;
+    private TableConfiguration tableConfig0;
 
     @Override
-    protected MvTableStorage tableStorage() {
-        return new TestConcurrentHashMapMvTableStorage(tableConfig);
+    protected void setUp() {
+        super.tableConfig = tableConfig0;
+    }
+
+    @Override
+    protected MvTableStorage tableStorage(TableIndexView sortedIdx, TableIndexView hashIdx, TablesConfiguration tablesCfg) {
+        return new TestConcurrentHashMapMvTableStorage(tableConfig, tablesCfg);
     }
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index fbdf675710..3aceb2da46 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -49,6 +49,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -86,6 +87,8 @@ public abstract class AbstractSortedIndexStorageTest {
 
     private static final int TEST_PARTITION = 0;
 
+    private Random random;
+
     private static List<ColumnDefinition> allTypesColumnDefinitions() {
         Stream<ColumnType> allColumnTypes = Stream.of(
                 ColumnType.INT8,
@@ -111,12 +114,12 @@ public abstract class AbstractSortedIndexStorageTest {
                 .collect(toUnmodifiableList());
     }
 
-    private final Random random;
-
     private MvTableStorage tableStorage;
 
     private MvPartitionStorage partitionStorage;
 
+    private TablesConfiguration tablesCfg;
+
     protected AbstractSortedIndexStorageTest() {
         long seed = System.currentTimeMillis();
 
@@ -130,12 +133,17 @@ public abstract class AbstractSortedIndexStorageTest {
      *
      * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
      */
-    protected final void initialize(MvTableStorage tableStorage) {
+    protected final void initialize(TableConfiguration tableCfg, TablesConfiguration tablesCfg) {
+        this.tablesCfg = tablesCfg;
+
+        createTestTable(tableCfg);
+    }
+
+    /** Set up internal storage implementation. */
+    protected final void initializeStorage(MvTableStorage tableStorage) {
         this.tableStorage = tableStorage;
 
         this.partitionStorage = tableStorage.getOrCreateMvPartition(TEST_PARTITION);
-
-        createTestTable(tableStorage.configuration());
     }
 
     /**
@@ -184,15 +192,12 @@ public abstract class AbstractSortedIndexStorageTest {
      * Creates a Sorted Index using the given index definition.
      */
     private SortedIndexStorage createIndexStorage(ColumnarIndexDefinition indexDefinition) {
-        TableConfiguration tableCfg = tableStorage.configuration();
-
-        CompletableFuture<Void> createIndexFuture = tableCfg.change(cfg ->
-                cfg.changeIndices(idxList ->
-                        idxList.create(indexDefinition.name(), idx -> convert(indexDefinition, idx))));
+        CompletableFuture<Void> createIndexFuture =
+                tablesCfg.indexes().change(chg -> chg.create(indexDefinition.name(), idx -> convert(indexDefinition, idx)));
 
         assertThat(createIndexFuture, willBe(nullValue(Void.class)));
 
-        TableIndexView indexConfig = tableCfg.value().indices().get(indexDefinition.name());
+        TableIndexView indexConfig = tablesCfg.indexes().get(indexDefinition.name()).value();
 
         return tableStorage.getOrCreateSortedIndex(0, indexConfig.id());
     }
@@ -235,7 +240,7 @@ public abstract class AbstractSortedIndexStorageTest {
     }
 
     /**
-     * Tests that it adding a row that already exists does not do anything.
+     * Tests that appending an already existing row does no harm.
      */
     @Test
     void testPutIdempotence() throws Exception {
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestHashIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestHashIndexStorageTest.java
index ae1783c0eb..0d15c37249 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestHashIndexStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestHashIndexStorageTest.java
@@ -17,9 +17,14 @@
 
 package org.apache.ignite.internal.storage.index;
 
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -45,8 +50,19 @@ public class TestHashIndexStorageTest extends AbstractHashIndexStorageTest {
                     },
                     value = "mock.dataStorage.name = " + TestConcurrentHashMapStorageEngine.ENGINE_NAME
             )
-            TableConfiguration tableCfg
+            TableConfiguration tableCfg,
+
+            @InjectConfiguration(polymorphicExtensions = {
+                    HashIndexConfigurationSchema.class,
+                    UnknownDataStorageConfigurationSchema.class,
+                    ConstantValueDefaultConfigurationSchema.class,
+                    FunctionCallDefaultConfigurationSchema.class,
+                    NullValueDefaultConfigurationSchema.class,
+                    UnlimitedBudgetConfigurationSchema.class,
+                    EntryCountBudgetConfigurationSchema.class
+            })
+            TablesConfiguration tablesConfig
     ) {
-        initialize(new TestConcurrentHashMapMvTableStorage(tableCfg));
+        initialize(new TestConcurrentHashMapMvTableStorage(tableCfg, tablesConfig), tablesConfig);
     }
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
index facf8bcf99..46589ded86 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/index/TestSortedIndexStorageTest.java
@@ -17,9 +17,15 @@
 
 package org.apache.ignite.internal.storage.index;
 
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -36,16 +42,32 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(ConfigurationExtension.class)
 public class TestSortedIndexStorageTest extends AbstractSortedIndexStorageTest {
     @BeforeEach
-    void setUp(@InjectConfiguration(
-            polymorphicExtensions = {
+    void setUp(
+            @InjectConfiguration(
+                    polymorphicExtensions = {
+                            TestConcurrentHashMapDataStorageConfigurationSchema.class,
+                            HashIndexConfigurationSchema.class,
+                            SortedIndexConfigurationSchema.class,
+                            NullValueDefaultConfigurationSchema.class,
+                            UnlimitedBudgetConfigurationSchema.class
+                    },
+                    value = "mock.dataStorage.name = " + TestConcurrentHashMapStorageEngine.ENGINE_NAME
+            )
+            TableConfiguration tableCfg,
+
+            @InjectConfiguration(polymorphicExtensions = {
+                    HashIndexConfigurationSchema.class,
                     SortedIndexConfigurationSchema.class,
-                    TestConcurrentHashMapDataStorageConfigurationSchema.class,
+                    UnknownDataStorageConfigurationSchema.class,
+                    ConstantValueDefaultConfigurationSchema.class,
+                    FunctionCallDefaultConfigurationSchema.class,
                     NullValueDefaultConfigurationSchema.class,
-                    UnlimitedBudgetConfigurationSchema.class
-            },
-            // This value only required for configuration validity, it's not used otherwise.
-            value = "mock.dataStorage.name = " + TestConcurrentHashMapStorageEngine.ENGINE_NAME
-    ) TableConfiguration tableCfg) {
-        initialize(new TestConcurrentHashMapMvTableStorage(tableCfg));
+                    UnlimitedBudgetConfigurationSchema.class,
+                    EntryCountBudgetConfigurationSchema.class
+            })
+            TablesConfiguration tablesConfig) {
+        initialize(tableCfg, tablesConfig);
+
+        initializeStorage(new TestConcurrentHashMapMvTableStorage(tableCfg, tablesConfig));
     }
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index b0fc385fec..e8deddc46f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.storage;
 
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -29,12 +30,24 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
-import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
@@ -64,24 +77,37 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
 
     protected MvTableStorage tableStorage;
 
-    private UUID sortedIndexId;
+    protected TableIndexView sortedIdx;
 
-    private UUID hashIndexId;
+    protected TableIndexView hashIdx;
 
-    protected abstract MvTableStorage tableStorage();
+    protected TableConfiguration tableConfig;
+
+    protected abstract MvTableStorage tableStorage(TableIndexView sortedIdx, TableIndexView hashIdx, TablesConfiguration tablesCfg);
+
+    protected abstract void setUp();
+
+    /** Configuration registry with one table for each test. */
+    private ConfigurationRegistry confRegistry;
 
     @BeforeEach
     void setUpBase() {
-        tableStorage = tableStorage();
+        startTestRegistry();
 
-        tableStorage.start();
+        TablesConfiguration tablesCfg = confRegistry.getConfiguration(TablesConfiguration.KEY);
+
+        createTestIndexes();
+
+        sortedIdx = tablesCfg.indexes().get(SORTED_INDEX_NAME).value();
+        hashIdx = tablesCfg.indexes().get(HASH_INDEX_NAME).value();
+
+        setUp();
 
         createTestTable();
 
-        NamedListView<TableIndexView> indexConfiguration = tableStorage.configuration().indices().value();
+        tableStorage = tableStorage(sortedIdx, hashIdx, tablesCfg);
 
-        sortedIndexId = indexConfiguration.get(SORTED_INDEX_NAME).id();
-        hashIndexId = indexConfiguration.get(HASH_INDEX_NAME).id();
+        tableStorage.start();
     }
 
     @AfterEach
@@ -139,12 +165,12 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
      */
     @Test
     public void testCreateSortedIndex() {
-        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexId));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()));
 
         // Index should only be available after the associated partition has been created.
         tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertThat(tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexId), is(notNullValue()));
+        assertThat(tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()), is(notNullValue()));
     }
 
     /**
@@ -152,12 +178,12 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
      */
     @Test
     public void testCreateHashIndex() {
-        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()));
 
         // Index should only be available after the associated partition has been created.
         tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId), is(notNullValue()));
+        assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()), is(notNullValue()));
     }
 
     /**
@@ -167,24 +193,24 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
     public void testDestroyIndex() {
         tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertThat(tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexId), is(notNullValue()));
-        assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId), is(notNullValue()));
+        assertThat(tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id()), is(notNullValue()));
+        assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()), is(notNullValue()));
 
-        assertThat(tableStorage.destroyIndex(sortedIndexId), willCompleteSuccessfully());
-        assertThat(tableStorage.destroyIndex(hashIndexId), willCompleteSuccessfully());
+        assertThat(tableStorage.destroyIndex(sortedIdx.id()), willCompleteSuccessfully());
+        assertThat(tableStorage.destroyIndex(hashIdx.id()), willCompleteSuccessfully());
     }
 
     @Test
     public void testHashIndexIndependence() {
         MvPartitionStorage partitionStorage1 = tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
-        assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId), is(notNullValue()));
-        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIndexId));
+        assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id()), is(notNullValue()));
+        assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIdx.id()));
 
         MvPartitionStorage partitionStorage2 = tableStorage.getOrCreateMvPartition(PARTITION_ID + 1);
 
-        HashIndexStorage storage1 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId);
-        HashIndexStorage storage2 = tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIndexId);
+        HashIndexStorage storage1 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        HashIndexStorage storage2 = tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIdx.id());
 
         assertThat(storage1, is(notNullValue()));
         assertThat(storage2, is(notNullValue()));
@@ -219,7 +245,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
         assertThat(getAll(storage1.get(tuple)), contains(rowId1));
         assertThat(getAll(storage2.get(tuple)), contains(rowId2));
 
-        assertThat(tableStorage.destroyIndex(sortedIndexId), willCompleteSuccessfully());
+        assertThat(tableStorage.destroyIndex(sortedIdx.id()), willCompleteSuccessfully());
     }
 
     /**
@@ -229,14 +255,14 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
     public void testMisconfiguredIndices() {
         Exception e = assertThrows(
                 StorageException.class,
-                () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexId)
+                () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id())
         );
 
         assertThat(e.getMessage(), is("Partition ID " + PARTITION_ID + " does not exist"));
 
         e = assertThrows(
                 StorageException.class,
-                () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId)
+                () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id())
         );
 
         assertThat(e.getMessage(), is("Partition ID " + PARTITION_ID + " does not exist"));
@@ -254,25 +280,61 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
 
         e = assertThrows(
                 StorageException.class,
-                () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, sortedIndexId)
+                () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, sortedIdx.id())
         );
 
         assertThat(
                 e.getMessage(),
-                is(String.format("Index \"%s\" is not configured as a Hash Index. Actual type: SORTED", sortedIndexId))
+                is(String.format("Index \"%s\" is not configured as a Hash Index. Actual type: SORTED", sortedIdx.id()))
         );
 
         e = assertThrows(
                 StorageException.class,
-                () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, hashIndexId)
+                () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, hashIdx.id())
         );
 
         assertThat(
                 e.getMessage(),
-                is(String.format("Index \"%s\" is not configured as a Sorted Index. Actual type: HASH", hashIndexId))
+                is(String.format("Index \"%s\" is not configured as a Sorted Index. Actual type: HASH", hashIdx.id()))
         );
     }
 
+    private void startTestRegistry() {
+        confRegistry = new ConfigurationRegistry(
+                List.of(TablesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of(
+                        HashIndexConfigurationSchema.class,
+                        SortedIndexConfigurationSchema.class,
+                        UnknownDataStorageConfigurationSchema.class,
+                        ConstantValueDefaultConfigurationSchema.class,
+                        FunctionCallDefaultConfigurationSchema.class,
+                        NullValueDefaultConfigurationSchema.class,
+                        UnlimitedBudgetConfigurationSchema.class,
+                        EntryCountBudgetConfigurationSchema.class
+                )
+        );
+
+        confRegistry.start();
+    }
+
+    private void createTestIndexes() {
+        CompletableFuture<Void> indexCreateFut = confRegistry.getConfiguration(TablesConfiguration.KEY).indexes().change(ch -> {
+            List.of(SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
+                            .addIndexColumn("COLUMN0").done()
+                            .build(),
+                    SchemaBuilders.hashIndex(HASH_INDEX_NAME)
+                            .withColumns("COLUMN0")
+                            .build()
+            ).forEach(idxDef -> ch.create(idxDef.name(), c ->
+                    SchemaConfigurationConverter.addIndex(idxDef, UUID.randomUUID(), c)));
+        });
+
+        assertThat(indexCreateFut, willCompleteSuccessfully());
+    }
+
     private void createTestTable() {
         TableDefinition tableDefinition = SchemaBuilders.tableBuilder("PUBLIC", "TEST")
                 .columns(
@@ -282,22 +344,13 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
                 .withPrimaryKey("ID")
                 .build();
 
-        CompletableFuture<Void> createTableFuture = tableStorage.configuration()
-                .change(tableChange -> SchemaConfigurationConverter.convert(tableDefinition, tableChange));
+        tableConfig.change(tblChg -> SchemaConfigurationConverter.convert(tableDefinition, tblChg));
 
-        assertThat(createTableFuture, willCompleteSuccessfully());
-
-        CompletableFuture<Void> indexCreateFut = tableStorage.configuration().change(tblCh ->
-                List.of(SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
-                                .addIndexColumn("COLUMN0").done()
-                                .build(),
-                        SchemaBuilders.hashIndex(HASH_INDEX_NAME)
-                                .withColumns("COLUMN0")
-                                .build()
-                ).forEach(idxDef -> SchemaConfigurationConverter.addIndex(idxDef, tblCh))
-        );
+        CompletableFuture<Void> createTableFuture = confRegistry.getConfiguration(TablesConfiguration.KEY).tables()
+                .change(chg -> chg.create(tableDefinition.canonicalName(),
+                        tblChg -> SchemaConfigurationConverter.convert(tableDefinition, tblChg)));
 
-        assertThat(indexCreateFut, willCompleteSuccessfully());
+        assertThat(createTableFuture, willCompleteSuccessfully());
     }
 
     private static <T> List<T> getAll(Cursor<T> cursor) {
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
index cab454a087..b4171d335a 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvTableStorage.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -37,7 +38,7 @@ import org.jetbrains.annotations.Nullable;
  * Test table storage implementation.
  */
 public class TestConcurrentHashMapMvTableStorage implements MvTableStorage {
-    private final TableConfiguration tableConfig;
+    private final TableConfiguration tableCfg;
 
     private final Map<Integer, MvPartitionStorage> partitions = new ConcurrentHashMap<>();
 
@@ -45,6 +46,8 @@ public class TestConcurrentHashMapMvTableStorage implements MvTableStorage {
 
     private final Map<UUID, HashIndices> hashIndicesById = new ConcurrentHashMap<>();
 
+    private final TablesConfiguration tablesCfg;
+
     /**
      * Class for storing Sorted Indices for a particular partition.
      */
@@ -79,8 +82,10 @@ public class TestConcurrentHashMapMvTableStorage implements MvTableStorage {
         }
     }
 
-    public TestConcurrentHashMapMvTableStorage(TableConfiguration tableCfg) {
-        this.tableConfig = tableCfg;
+    /** Costructor. */
+    public TestConcurrentHashMapMvTableStorage(TableConfiguration tableCfg, TablesConfiguration tablesCfg) {
+        this.tableCfg = tableCfg;
+        this.tablesCfg = tablesCfg;
     }
 
     @Override
@@ -114,7 +119,7 @@ public class TestConcurrentHashMapMvTableStorage implements MvTableStorage {
 
         SortedIndices sortedIndices = sortedIndicesById.computeIfAbsent(
                 indexId,
-                id -> new SortedIndices(new SortedIndexDescriptor(id, tableConfig.value()))
+                id -> new SortedIndices(new SortedIndexDescriptor(id, tableCfg.value(), tablesCfg.value()))
         );
 
         return sortedIndices.getOrCreateStorage(partitionId);
@@ -128,7 +133,7 @@ public class TestConcurrentHashMapMvTableStorage implements MvTableStorage {
 
         HashIndices sortedIndices = hashIndicesById.computeIfAbsent(
                 indexId,
-                id -> new HashIndices(new HashIndexDescriptor(id, tableConfig.value()))
+                id -> new HashIndices(new HashIndexDescriptor(id, tableCfg.value(), tablesCfg.value()))
         );
 
         return sortedIndices.getOrCreateStorage(partitionId);
@@ -154,7 +159,7 @@ public class TestConcurrentHashMapMvTableStorage implements MvTableStorage {
 
     @Override
     public TableConfiguration configuration() {
-        return tableConfig;
+        return tableCfg;
     }
 
     @Override
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java
index ca64fb627e..4efe39e9c5 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapStorageEngine.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.chm;
 
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
@@ -44,9 +45,9 @@ public class TestConcurrentHashMapStorageEngine implements StorageEngine {
 
     /** {@inheritDoc} */
     @Override
-    public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
+    public MvTableStorage createMvTable(TableConfiguration tableCfg, TablesConfiguration tablesCfg) throws StorageException {
         assert tableCfg.dataStorage().name().value().equals(ENGINE_NAME) : tableCfg.dataStorage().name().value();
 
-        return new TestConcurrentHashMapMvTableStorage(tableCfg);
+        return new TestConcurrentHashMapMvTableStorage(tableCfg, tablesCfg);
     }
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
index 3c6b4b6fbc..e248511584 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
@@ -32,7 +32,8 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
-import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
@@ -63,13 +64,13 @@ public abstract class AbstractHashIndexStorageTest {
 
     private BinaryTupleRowSerializer serializer;
 
-    protected void initialize(MvTableStorage tableStorage) {
+    protected void initialize(MvTableStorage tableStorage, TablesConfiguration tablesCfg) {
         this.tableStorage = tableStorage;
 
         createTestTable(tableStorage.configuration());
 
         this.partitionStorage = tableStorage.getOrCreateMvPartition(TEST_PARTITION);
-        this.indexStorage = createIndex(tableStorage);
+        this.indexStorage = createIndex(tableStorage, tablesCfg);
         this.serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor());
     }
 
@@ -85,7 +86,7 @@ public abstract class AbstractHashIndexStorageTest {
                 column(STR_COLUMN_NAME, ColumnType.string()).asNullable(true).build()
         };
 
-        TableDefinition tableDefinition = tableBuilder("test", "foo")
+        TableDefinition tableDefinition = tableBuilder("schema", "table")
                 .columns(allColumns)
                 .withPrimaryKey(pkColumn.name())
                 .build();
@@ -98,20 +99,20 @@ public abstract class AbstractHashIndexStorageTest {
     /**
      * Configures and creates a storage instance for testing.
      */
-    private static HashIndexStorage createIndex(MvTableStorage tableStorage) {
+    private static HashIndexStorage createIndex(MvTableStorage tableStorage, TablesConfiguration tablesConf) {
         HashIndexDefinition indexDefinition = SchemaBuilders.hashIndex("hashIndex")
                 .withColumns(INT_COLUMN_NAME, STR_COLUMN_NAME)
                 .build();
 
-        CompletableFuture<Void> createIndexFuture = tableStorage.configuration()
-                .change(cfg -> cfg.changeIndices(idxList ->
-                        idxList.create(indexDefinition.name(), idx -> convert(indexDefinition, idx))));
+        CompletableFuture<Void> createIndexFuture =
+                tablesConf.indexes().change(chg -> chg.create(indexDefinition.name(),
+                        idx -> convert(indexDefinition, idx)));
 
         assertThat(createIndexFuture, willCompleteSuccessfully());
 
-        TableIndexView indexConfig = tableStorage.configuration().indices().get(indexDefinition.name()).value();
+        TableIndexConfiguration indexConfig = tablesConf.indexes().get(indexDefinition.name());
 
-        return tableStorage.getOrCreateHashIndex(TEST_PARTITION, indexConfig.id());
+        return tableStorage.getOrCreateHashIndex(TEST_PARTITION, indexConfig.id().value());
     }
 
     /**
@@ -205,7 +206,7 @@ public abstract class AbstractHashIndexStorageTest {
     }
 
     @Test
-    public void testDestroy() throws Exception {
+    public void testDestroy() {
         IndexRow row1 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION));
         IndexRow row2 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION));
         IndexRow row3 = serializer.serializeRow(new Object[]{ 2, "bar" }, new RowId(TEST_PARTITION));
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index f0d6ed68f8..578ab2b549 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -43,6 +44,8 @@ import org.jetbrains.annotations.Nullable;
 public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
     protected final TableConfiguration tableCfg;
 
+    protected TablesConfiguration tablesConfiguration;
+
     protected volatile boolean started;
 
     protected volatile AtomicReferenceArray<AbstractPageMemoryMvPartitionStorage> mvPartitions;
@@ -52,8 +55,9 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
      *
      * @param tableCfg Table configuration.
      */
-    protected AbstractPageMemoryTableStorage(TableConfiguration tableCfg) {
+    protected AbstractPageMemoryTableStorage(TableConfiguration tableCfg, TablesConfiguration tablesCfg) {
         this.tableCfg = tableCfg;
+        tablesConfiguration = tablesCfg;
     }
 
     /** {@inheritDoc} */
@@ -117,7 +121,7 @@ public abstract class AbstractPageMemoryTableStorage implements MvTableStorage {
         if (partitionId < 0 || partitionId >= mvPartitions.length()) {
             throw new IllegalArgumentException(S.toString(
                     "Unable to access partition with id outside of configured range",
-                    "table", tableCfg.name().value(), false,
+                    "table", tableCfg.value().name(), false,
                     "partitionId", partitionId, false,
                     "partitions", mvPartitions.length(), false
             ));
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 8b486293f4..d462fc58ce 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -29,6 +29,7 @@ import org.apache.ignite.configuration.notifications.ConfigurationNamedListListe
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.fileio.AsyncFileIoFactory;
 import org.apache.ignite.internal.fileio.FileIoFactory;
@@ -186,14 +187,15 @@ public class PersistentPageMemoryStorageEngine implements StorageEngine {
 
     /** {@inheritDoc} */
     @Override
-    public PersistentPageMemoryTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
+    public PersistentPageMemoryTableStorage createMvTable(TableConfiguration tableCfg, TablesConfiguration tablesCfg)
+            throws StorageException {
         TableView tableView = tableCfg.value();
 
-        assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
+        assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableCfg.dataStorage().name();
 
         PersistentPageMemoryDataStorageView dataStorageView = (PersistentPageMemoryDataStorageView) tableView.dataStorage();
 
-        return new PersistentPageMemoryTableStorage(this, tableCfg, regions.get(dataStorageView.dataRegion()));
+        return new PersistentPageMemoryTableStorage(this, tableCfg, regions.get(dataStorageView.dataRegion()), tablesCfg);
     }
 
     /**
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index fa57d3ff98..85840e2c42 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
@@ -63,9 +64,10 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
     public PersistentPageMemoryTableStorage(
             PersistentPageMemoryStorageEngine engine,
             TableConfiguration tableCfg,
-            PersistentPageMemoryDataRegion dataRegion
+            PersistentPageMemoryDataRegion dataRegion,
+            TablesConfiguration tablesCfg
     ) {
-        super(tableCfg);
+        super(tableCfg, tablesCfg);
 
         this.engine = engine;
         this.dataRegion = dataRegion;
@@ -165,7 +167,8 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
                     rowVersionFreeList,
                     indexColumnsFreeList,
                     versionChainTree,
-                    indexMetaTree
+                    indexMetaTree,
+                    tablesConfiguration
             );
         } catch (IgniteInternalCheckedException e) {
             throw new StorageException(
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index dc0e607b66..ee37d900f7 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -27,6 +27,7 @@ import org.apache.ignite.configuration.notifications.ConfigurationNamedListListe
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryDataRegionConfiguration;
 import org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryDataRegionView;
@@ -92,14 +93,15 @@ public class VolatilePageMemoryStorageEngine implements StorageEngine {
 
     /** {@inheritDoc} */
     @Override
-    public VolatilePageMemoryTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
+    public VolatilePageMemoryTableStorage createMvTable(TableConfiguration tableCfg, TablesConfiguration tablesCfg)
+            throws StorageException {
         TableView tableView = tableCfg.value();
 
         assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
 
         VolatilePageMemoryDataStorageView dataStorageView = (VolatilePageMemoryDataStorageView) tableView.dataStorage();
 
-        return new VolatilePageMemoryTableStorage(tableCfg, regions.get(dataStorageView.dataRegion()));
+        return new VolatilePageMemoryTableStorage(tableCfg, tablesCfg, regions.get(dataStorageView.dataRegion()));
     }
 
     /**
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index c309a64217..1678056c51 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -22,6 +22,7 @@ import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
@@ -41,8 +42,12 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
      * @param tableCfg – Table configuration.
      * @param dataRegion – Data region for the table.
      */
-    public VolatilePageMemoryTableStorage(TableConfiguration tableCfg, VolatilePageMemoryDataRegion dataRegion) {
-        super(tableCfg);
+    public VolatilePageMemoryTableStorage(
+            TableConfiguration tableCfg,
+            TablesConfiguration tablesCfg,
+            VolatilePageMemoryDataRegion dataRegion
+    ) {
+        super(tableCfg, tablesCfg);
 
         this.dataRegion = dataRegion;
     }
@@ -61,6 +66,7 @@ public class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStora
 
         return new VolatilePageMemoryMvPartitionStorage(
                 this,
+                tablesConfiguration,
                 partitionId,
                 versionChainTree,
                 indexMetaTree
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 1a5f40ca47..62690b5242 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.schemas.table.HashIndexView;
 import org.apache.ignite.configuration.schemas.table.SortedIndexView;
 import org.apache.ignite.configuration.schemas.table.TableIndexView;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.PageIdAllocator;
 import org.apache.ignite.internal.pagememory.PageMemory;
@@ -85,6 +86,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
     protected final IndexMetaTree indexMetaTree;
 
+    private final TablesConfiguration tablesConfiguration;
+
     protected final DataPageReader rowVersionDataPageReader;
 
     protected final ConcurrentMap<UUID, HashIndexStorage> indexes = new ConcurrentHashMap<>();
@@ -105,7 +108,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             RowVersionFreeList rowVersionFreeList,
             IndexColumnsFreeList indexFreeList,
             VersionChainTree versionChainTree,
-            IndexMetaTree indexMetaTree
+            IndexMetaTree indexMetaTree,
+            TablesConfiguration tablesCfg
     ) {
         this.partitionId = partitionId;
         this.tableStorage = tableStorage;
@@ -116,6 +120,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         this.versionChainTree = versionChainTree;
         this.indexMetaTree = indexMetaTree;
 
+        tablesConfiguration = tablesCfg;
+
         PageMemory pageMemory = tableStorage.dataRegion().pageMemory();
 
         groupId = tableStorage.configuration().value().tableId();
@@ -130,12 +136,12 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         try {
             IgniteCursor<IndexMeta> cursor = indexMetaTree.find(null, null);
 
-            NamedListView<? extends TableIndexView> indicesCfgView = tableStorage.configuration().value().indices();
+            NamedListView<TableIndexView> indexesCfgView = tablesConfiguration.indexes().value();
 
             while (cursor.next()) {
                 IndexMeta indexMeta = cursor.get();
 
-                TableIndexView indexCfgView = getByInternalId(indicesCfgView, indexMeta.id());
+                TableIndexView indexCfgView = getByInternalId(indexesCfgView, indexMeta.id());
 
                 if (indexCfgView instanceof HashIndexView) {
                     createOrRestoreHashIndex(indexMeta);
@@ -164,7 +170,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
     private PageMemoryHashIndexStorage createOrRestoreHashIndex(IndexMeta indexMeta) {
         TableView tableView = tableStorage.configuration().value();
 
-        var indexDescriptor = new HashIndexDescriptor(indexMeta.id(), tableView);
+        var indexDescriptor = new HashIndexDescriptor(indexMeta.id(), tableView, tablesConfiguration.value());
 
         try {
             PageMemory pageMemory = tableStorage.dataRegion().pageMemory();
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 20cea5b3f3..a8f53257bf 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
 import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointListener;
 import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
@@ -76,9 +77,10 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
             RowVersionFreeList rowVersionFreeList,
             IndexColumnsFreeList indexFreeList,
             VersionChainTree versionChainTree,
-            IndexMetaTree indexMetaTree
+            IndexMetaTree indexMetaTree,
+            TablesConfiguration tablesCfg
     ) {
-        super(partitionId, tableStorage, rowVersionFreeList, indexFreeList, versionChainTree, indexMetaTree);
+        super(partitionId, tableStorage, rowVersionFreeList, indexFreeList, versionChainTree, indexMetaTree, tablesCfg);
 
         checkpointManager = tableStorage.engine().checkpointManager();
         checkpointTimeoutLock = checkpointManager.checkpointTimeoutLock();
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 7a0f9ce7aa..3e35dc0add 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.storage.pagememory.mv;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
@@ -41,6 +42,7 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
      */
     public VolatilePageMemoryMvPartitionStorage(
             VolatilePageMemoryTableStorage tableStorage,
+            TablesConfiguration tablesCfg,
             int partitionId,
             VersionChainTree versionChainTree,
             IndexMetaTree indexMetaTree
@@ -51,7 +53,8 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
                 tableStorage.dataRegion().rowVersionFreeList(),
                 tableStorage.dataRegion().indexColumnsFreeList(),
                 versionChainTree,
-                indexMetaTree
+                indexMetaTree,
+                tablesCfg
         );
     }
 
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
index fa2df53ec9..4291582ea9 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
@@ -18,17 +18,21 @@
 package org.apache.ignite.internal.storage.pagememory.index;
 
 import java.nio.file.Path;
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.schema.configuration.schema.TestDataStorageConfigurationSchema;
 import org.apache.ignite.internal.storage.index.AbstractHashIndexStorageTest;
 import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
 import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryTableStorage;
@@ -68,6 +72,24 @@ class PersistentPageMemoryHashIndexStorageTest extends AbstractHashIndexStorageT
     )
     private TableConfiguration tableCfg;
 
+    @InjectConfiguration(
+            name = "tables",
+            polymorphicExtensions = {
+                    HashIndexConfigurationSchema.class,
+                    PersistentPageMemoryDataStorageConfigurationSchema.class,
+                    HashIndexConfigurationSchema.class,
+                    SortedIndexConfigurationSchema.class,
+                    UnknownDataStorageConfigurationSchema.class,
+                    TestDataStorageConfigurationSchema.class,
+                    ConstantValueDefaultConfigurationSchema.class,
+                    FunctionCallDefaultConfigurationSchema.class,
+                    NullValueDefaultConfigurationSchema.class,
+                    UnlimitedBudgetConfigurationSchema.class,
+                    EntryCountBudgetConfigurationSchema.class
+            }
+    )
+    TablesConfiguration tablesConfig;
+
     private PersistentPageMemoryStorageEngine engine;
 
     private PersistentPageMemoryTableStorage table;
@@ -82,11 +104,11 @@ class PersistentPageMemoryHashIndexStorageTest extends AbstractHashIndexStorageT
 
         engine.start();
 
-        table = engine.createMvTable(tableCfg);
+        table = engine.createMvTable(tableCfg, tablesConfig);
 
         table.start();
 
-        initialize(table);
+        initialize(table, tablesConfig);
     }
 
     @AfterEach
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
index aa68e0f4f2..b45cb21917 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/VolatilePageMemoryHashIndexStorageTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.storage.pagememory.index;
 
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -62,6 +65,18 @@ class VolatilePageMemoryHashIndexStorageTest extends AbstractHashIndexStorageTes
     )
     private TableConfiguration tableCfg;
 
+    @InjectConfiguration(polymorphicExtensions = {
+            HashIndexConfigurationSchema.class,
+            SortedIndexConfigurationSchema.class,
+            UnknownDataStorageConfigurationSchema.class,
+            ConstantValueDefaultConfigurationSchema.class,
+            FunctionCallDefaultConfigurationSchema.class,
+            NullValueDefaultConfigurationSchema.class,
+            UnlimitedBudgetConfigurationSchema.class,
+            EntryCountBudgetConfigurationSchema.class
+    })
+    TablesConfiguration tablesConfig;
+
     private VolatilePageMemoryStorageEngine engine;
 
     private VolatilePageMemoryTableStorage table;
@@ -76,11 +91,11 @@ class VolatilePageMemoryHashIndexStorageTest extends AbstractHashIndexStorageTes
 
         engine.start();
 
-        table = engine.createMvTable(tableCfg);
+        table = engine.createMvTable(tableCfg, tablesConfig);
 
         table.start();
 
-        initialize(table);
+        initialize(table, tablesConfig);
     }
 
     @AfterEach
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 55670532bb..6da4872060 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -93,7 +93,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvPar
                 ((PersistentPageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
         );
 
-        table = engine.createMvTable(tableCfg);
+        table = engine.createMvTable(tableCfg, null);
         table.start();
 
         storage = table.createMvPartitionStorage(PARTITION_ID);
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
index c65981d72a..7d09497560 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
@@ -84,7 +84,7 @@ class VolatilePageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvParti
                 ((VolatilePageMemoryDataStorageView) tableCfg.dataStorage().value()).dataRegion()
         );
 
-        table = engine.createMvTable(tableCfg);
+        table = engine.createMvTable(tableCfg, null);
         table.start();
 
         storage = table.createMvPartitionStorage(PARTITION_ID);
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndices.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java
similarity index 96%
rename from modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndices.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java
index 9783e48944..ea6294f6cb 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndices.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndexes.java
@@ -24,12 +24,12 @@ import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
 
-class HashIndices {
+class HashIndexes {
     private final HashIndexDescriptor descriptor;
 
     private final ConcurrentMap<Integer, HashIndexStorage> storages = new ConcurrentHashMap<>();
 
-    HashIndices(HashIndexDescriptor descriptor) {
+    HashIndexes(HashIndexDescriptor descriptor) {
         this.descriptor = descriptor;
     }
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index fc0c8bd425..099d2d6402 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.notifications.ConfigurationNamedListListe
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.storage.StorageException;
@@ -148,7 +149,7 @@ public class RocksDbStorageEngine implements StorageEngine {
 
     /** {@inheritDoc} */
     @Override
-    public RocksDbTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
+    public RocksDbTableStorage createMvTable(TableConfiguration tableCfg, TablesConfiguration tablesCfg) throws StorageException {
         TableView tableView = tableCfg.value();
 
         assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
@@ -165,6 +166,6 @@ public class RocksDbStorageEngine implements StorageEngine {
             throw new StorageException("Failed to create table store directory for " + tableView.name() + ": " + e.getMessage(), e);
         }
 
-        return new RocksDbTableStorage(this, tablePath, tableCfg, dataRegion);
+        return new RocksDbTableStorage(this, tablePath, tableCfg, dataRegion, tablesCfg);
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 3acbde412a..bded4516ee 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
@@ -79,6 +80,9 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** Table configuration. */
     private final TableConfiguration tableCfg;
 
+    /** Indexes configuration. */
+    private final TablesConfiguration tablesCfg;
+
     /** Data region for the table. */
     private final RocksDbDataRegion dataRegion;
 
@@ -104,7 +108,7 @@ public class RocksDbTableStorage implements MvTableStorage {
     private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> partitions;
 
     /** Hash Index storages by Index IDs. */
-    private final ConcurrentMap<UUID, HashIndices> hashIndices = new ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, HashIndexes> hashIndices = new ConcurrentHashMap<>();
 
     /** Busy lock to stop synchronously. */
     final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -124,12 +128,14 @@ public class RocksDbTableStorage implements MvTableStorage {
             RocksDbStorageEngine engine,
             Path tablePath,
             TableConfiguration tableCfg,
-            RocksDbDataRegion dataRegion
+            RocksDbDataRegion dataRegion,
+            TablesConfiguration tablesCfg
     ) {
         this.engine = engine;
         this.tablePath = tablePath;
         this.tableCfg = tableCfg;
         this.dataRegion = dataRegion;
+        this.tablesCfg = tablesCfg;
     }
 
     /**
@@ -219,7 +225,8 @@ public class RocksDbTableStorage implements MvTableStorage {
                         break;
 
                     default:
-                        throw new StorageException("Unidentified column family [name=" + cf.name() + ", table=" + tableCfg.name() + ']');
+                        throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
+                                + tableCfg.value().name() + ']');
                 }
             }
 
@@ -383,10 +390,10 @@ public class RocksDbTableStorage implements MvTableStorage {
 
     @Override
     public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
-        HashIndices storages = hashIndices.computeIfAbsent(indexId, id -> {
-            var indexDescriptor = new HashIndexDescriptor(indexId, tableCfg.value());
+        HashIndexes storages = hashIndices.computeIfAbsent(indexId, id -> {
+            var indexDescriptor = new HashIndexDescriptor(id, tableCfg.value(), tablesCfg.value());
 
-            return new HashIndices(indexDescriptor);
+            return new HashIndexes(indexDescriptor);
         });
 
         RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
@@ -401,7 +408,7 @@ public class RocksDbTableStorage implements MvTableStorage {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> destroyIndex(UUID indexId) {
-        HashIndices storages = hashIndices.remove(indexId);
+        HashIndexes storages = hashIndices.remove(indexId);
 
         if (storages == null) {
             return CompletableFuture.completedFuture(null);
@@ -427,7 +434,7 @@ public class RocksDbTableStorage implements MvTableStorage {
         if (partId < 0 || partId >= partitions.length()) {
             throw new IllegalArgumentException(S.toString(
                     "Unable to access partition with id outside of configured range",
-                    "table", tableCfg.name().value(), false,
+                    "table", tableCfg.value().name(), false,
                     "partitionId", partId, false,
                     "partitions", partitions.length(), false
             ));
@@ -489,7 +496,7 @@ public class RocksDbTableStorage implements MvTableStorage {
                 );
 
             default:
-                throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.name() + ']');
+                throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.value().name() + ']');
         }
     }
 }
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 962c60cb0a..099685d083 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -86,7 +86,7 @@ public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTes
 
         engine.start();
 
-        table = engine.createMvTable(tableCfg);
+        table = engine.createMvTable(tableCfg, null);
 
         table.start();
 
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index c4a944f18a..467d59a703 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -32,7 +32,10 @@ import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchem
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
@@ -71,18 +74,25 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
                     UnlimitedBudgetConfigurationSchema.class
             }
     )
-    private TableConfiguration tableCfg;
+    private TableConfiguration tableCfg0;
 
     @WorkDirectory
     private Path workDir;
 
+    private ConfigurationRegistry confRegistry;
+
+    @Override
+    protected void setUp() {
+        super.tableConfig = tableCfg0;
+    }
+
     @Override
-    protected MvTableStorage tableStorage() {
+    protected MvTableStorage tableStorage(TableIndexView sortedIdx, TableIndexView hashIdx, TablesConfiguration tablesCfg) {
         engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
 
         engine.start();
 
-        MvTableStorage storage = engine.createMvTable(tableCfg);
+        MvTableStorage storage = engine.createMvTable(tableConfig, tablesCfg);
 
         assertThat(storage, is(instanceOf(RocksDbTableStorage.class)));
 
@@ -140,7 +150,7 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
 
         tableStorage.stop();
 
-        tableStorage = engine.createMvTable(tableCfg);
+        tableStorage = engine.createMvTable(tableConfig, null);
 
         tableStorage.start();
 
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
index 639b0d30f3..ff6d527f27 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
@@ -83,7 +83,7 @@ public class RocksDbStorageEngineTest {
                     }
             ) TableConfiguration tableCfg
     ) {
-        MvTableStorage table = engine.createMvTable(tableCfg);
+        MvTableStorage table = engine.createMvTable(tableCfg, null);
 
         table.start();
 
@@ -121,7 +121,7 @@ public class RocksDbStorageEngineTest {
 
         assertThat(engineConfigChangeFuture, willCompleteSuccessfully());
 
-        MvTableStorage table = engine.createMvTable(tableCfg);
+        MvTableStorage table = engine.createMvTable(tableCfg, null);
 
         table.start();
 
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
index 81b77dd891..2839e05c05 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
@@ -18,9 +18,14 @@
 package org.apache.ignite.internal.storage.rocksdb.index;
 
 import java.nio.file.Path;
+import org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.ConstantValueDefaultConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.EntryCountBudgetConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.FunctionCallDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.NullValueDefaultConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.schemas.table.UnlimitedBudgetConfigurationSchema;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -59,18 +64,27 @@ public class RocksDbHashIndexStorageTest extends AbstractHashIndexStorageTest {
                             UnlimitedBudgetConfigurationSchema.class
                     },
                     value = "mock.dataStorage.name = " + RocksDbStorageEngine.ENGINE_NAME
-            )
-            TableConfiguration tableCfg
+            ) TableConfiguration tableCfg,
+
+            @InjectConfiguration(polymorphicExtensions = {
+                    HashIndexConfigurationSchema.class,
+                    UnknownDataStorageConfigurationSchema.class,
+                    ConstantValueDefaultConfigurationSchema.class,
+                    FunctionCallDefaultConfigurationSchema.class,
+                    NullValueDefaultConfigurationSchema.class,
+                    UnlimitedBudgetConfigurationSchema.class,
+                    EntryCountBudgetConfigurationSchema.class
+            }) TablesConfiguration tablesConfig
     ) {
         engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
 
         engine.start();
 
-        tableStorage = engine.createMvTable(tableCfg);
+        tableStorage = engine.createMvTable(tableCfg, tablesConfig);
 
         tableStorage.start();
 
-        initialize(tableStorage);
+        initialize(tableStorage, tablesConfig);
     }
 
     @AfterEach
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 73931b8208..e85aca12cb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -63,10 +63,12 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.configuration.ConfigurationProperty;
+import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.configuration.validation.ConfigurationValidationException;
@@ -167,6 +169,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metostorage,
      * TODO: will be removed after fix of the issue.
      */
+    @TestOnly
     private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
 
     /** Tables configuration. */
@@ -835,7 +838,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         TableConfiguration tableCfg = tablesCfg.tables().get(name);
 
-        MvTableStorage tableStorage = dataStorageMgr.engine(tableCfg.dataStorage()).createMvTable(tableCfg);
+        MvTableStorage tableStorage = dataStorageMgr.engine(tableCfg.dataStorage()).createMvTable(tableCfg, tablesCfg);
 
         tableStorage.start();
 
@@ -1239,12 +1242,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                 throw new TableNotFoundException(name);
                             }
 
-                            List<String> indicesNames = tableCfg.indices().namedListKeys();
+                            NamedListView<TableIndexView> idxView = tablesCfg.indexes().value();
+
+                            boolean idxFound = idxView.namedListKeys().stream()
+                                    .anyMatch(idx -> idxView.get(idx).tableId().equals(tbl.tableId()));
 
                             //TODO: https://issues.apache.org/jira/browse/IGNITE-17562
                             // Let's drop orphaned indices instantly.
-                            if (!indicesNames.isEmpty()) {
-                                // TODO: https://issues.apache.org/jira/browse/IGNITE-17474 Implement cascade drop for indices.
+                            if (idxFound) {
                                 throw new IgniteException("Can't drop table with indices.");
                             }