You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/07/17 15:32:09 UTC

[ignite-3] branch ignite-19499 updated (09b385aec1 -> 52ed45fd90)

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a change to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


    from 09b385aec1 Switch other managers to Catalog events.
     new 61c307717e fixup! Switch TableManager to Catalog events.
     new 849c280d3a revert unrelated changes
     new 9c26b16b96 Fix SchemaManager
     new 52ed45fd90 wip. switching to catalog.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ignite/internal/catalog/CatalogService.java    |   5 +
 .../internal/catalog/CatalogServiceImpl.java       |  11 ++
 .../apache/ignite/internal/index/IndexManager.java | 207 +--------------------
 .../ignite/internal/index/IndexManagerTest.java    |   2 +-
 .../storage/ItRebalanceDistributedTest.java        |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   4 +-
 .../ignite/internal/schema/SchemaManager.java      | 119 +++++++-----
 .../internal/sql/engine/SqlQueryProcessor.java     |  39 ++--
 .../sql/engine/exec/ExecutionServiceImpl.java      |   7 +-
 .../sql/engine/schema/CatalogSqlSchemaManager.java |  11 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   4 +-
 .../internal/table/distributed/TableManager.java   |  28 +--
 .../table/distributed/TableManagerTest.java        |   2 +-
 14 files changed, 151 insertions(+), 294 deletions(-)


[ignite-3] 02/04: revert unrelated changes

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 849c280d3a6cd1c4c18df8fb0f14529af76e0aac
Author: amashenkov <an...@gmail.com>
AuthorDate: Fri Jul 14 13:10:19 2023 +0300

    revert unrelated changes
---
 .../main/java/org/apache/ignite/internal/index/IndexManager.java   | 7 +++----
 .../org/apache/ignite/internal/table/distributed/TableManager.java | 2 +-
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index ede3febeca..97d94f32bf 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
 
@@ -390,8 +391,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
                     });
 
             // TODO: investigate why DropIndexFuture hangs.
-            // return allOf(fireEventFuture, dropIndexFuture);
-            return fireEventFuture;
+            return allOf(fireEventFuture, dropIndexFuture);
         } catch (Throwable th) {
             LOG.warn("Failed to process drop index event.", th);
 
@@ -490,8 +490,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         });
 
         // TODO: investigate why fireEventFuture hangs.
-        // return allOf(createIndexFuture, fireEventFuture);
-        return createIndexFuture;
+         return allOf(createIndexFuture, fireEventFuture);
     }
 
     /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 43381a7958..6d4b35ebc7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1307,7 +1307,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         CompletableFuture<?> eventFut = fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId));
 
         // TODO: investigate why createParts and eventFutures hangs.
-        return completedFuture(null);
+        return allOf(createPartsFut, eventFut);
     }
 
     /**


[ignite-3] 03/04: Fix SchemaManager

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 9c26b16b964f4b730dc45affd5521e58f9dec406
Author: amashenkov <an...@gmail.com>
AuthorDate: Fri Jul 14 13:11:08 2023 +0300

    Fix SchemaManager
---
 .../ignite/internal/schema/SchemaManager.java      | 72 ++++++++++++++++++----
 1 file changed, 59 insertions(+), 13 deletions(-)

diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index bb24c3d0fc..25e434ec73 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -37,8 +37,12 @@ import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
 import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -108,6 +112,8 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     public void start() {
         catalogManager.listen(CatalogEvent.TABLE_CREATE,
                 (params, ex) -> onSchemaChange((CreateTableEventParameters) params).thenApply(ignore -> false));
+        catalogManager.listen(CatalogEvent.TABLE_ALTER,
+                (params, ex) -> onSchemaChange(params).thenApply(ignore -> false));
     }
 
     /**
@@ -117,23 +123,65 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
      * @return A future.
      */
     private CompletableFuture<?> onSchemaChange(CreateTableEventParameters evt) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
-        }
+        CatalogTableDescriptor tableDescriptor = evt.tableDescriptor();
 
-        try {
-            CatalogTableDescriptor tableDescriptor = evt.tableDescriptor();
+        int newSchemaVersion = INITIAL_SCHEMA_VERSION;// evt.catalogVersion();
+        int tblId = tableDescriptor.id();
+        String tblName = tableDescriptor.name();
+
+        SchemaDescriptor newSchema = CatalogDescriptorUtils.convert(newSchemaVersion, tableDescriptor);
 
-            int newSchemaVersion = evt.catalogVersion();
-            int tblId = tableDescriptor.id();
-            String tblName = tableDescriptor.name();
+        return onSchemaChange(tblId, tblName, newSchema, evt.causalityToken());
+    }
 
-            if (searchSchemaByVersion(tblId, newSchemaVersion) != null) {
-                return completedFuture(null);
+    /**
+     * Listener of schema configuration changes.
+     *
+     * @param evt Event parameters.
+     * @return A future.
+     */
+    private CompletableFuture<?> onSchemaChange(CatalogEventParameters evt) {
+        try {
+            CatalogTableDescriptor tableDescriptor;
+            int newSchemaVersion;
+
+            if (evt instanceof AddColumnEventParameters) {
+                AddColumnEventParameters params = (AddColumnEventParameters) evt;
+
+                tableDescriptor = catalogManager.table(params.tableId(), params.catalogVersion());
+                newSchemaVersion = registriesVv.latest().get(tableDescriptor.id()).lastSchemaVersion() + 1;
+            } else if (evt instanceof DropColumnEventParameters) {
+                DropColumnEventParameters params = (DropColumnEventParameters) evt;
+
+                tableDescriptor = catalogManager.table(params.tableId(), params.catalogVersion());
+                newSchemaVersion = registriesVv.latest().get(tableDescriptor.id()).lastSchemaVersion() + 1;
+            } else if (evt instanceof AlterColumnEventParameters) {
+                AlterColumnEventParameters params = (AlterColumnEventParameters) evt;
+
+                tableDescriptor = catalogManager.table(params.tableId(), params.catalogVersion());
+                newSchemaVersion = registriesVv.latest().get(tableDescriptor.id()).lastSchemaVersion() + 1;
+            } else {
+                return completedFuture(new UnsupportedOperationException("Unexpected event."));
             }
 
             SchemaDescriptor newSchema = CatalogDescriptorUtils.convert(newSchemaVersion, tableDescriptor);
 
+            return onSchemaChange(tableDescriptor.id(), tableDescriptor.name(), newSchema, evt.causalityToken());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            return failedFuture(th);
+        }
+    }
+
+    /**
+     * Update schema.
+     */
+    private CompletableFuture<?> onSchemaChange(int tblId, String tblName, SchemaDescriptor newSchema, long causalityToken) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
+        }
+
+        try {
             // This is intentionally a blocking call to enforce configuration listener execution order. Unfortunately it is not possible
             // to execute this method asynchronously, because the schema descriptor is needed to fire the CREATE event as a synchronous part
             // of the configuration listener.
@@ -147,8 +195,6 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
                 return failedFuture(e);
             }
 
-            long causalityToken = evt.causalityToken();
-
             // Fire event early, because dependent listeners have to register VersionedValues' update futures
             var eventParams = new SchemaEventParameters(causalityToken, tblId, newSchema);
 
@@ -162,7 +208,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
             return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> {
                 if (e != null) {
                     return failedFuture(new IgniteInternalException(IgniteStringFormatter.format(
-                            "Cannot create a schema for the table [tblId={}, ver={}]", tblId, newSchemaVersion), e)
+                            "Cannot create a schema for the table [tblId={}, ver={}]", tblId, newSchema.version()), e)
                     );
                 }
 


[ignite-3] 04/04: wip. switching to catalog.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 52ed45fd904550cf1adcb26c2815b036ffbf2ba0
Author: amashenkov <an...@gmail.com>
AuthorDate: Mon Jul 17 18:31:59 2023 +0300

    wip. switching to catalog.
---
 .../ignite/internal/catalog/CatalogService.java    |   3 +
 .../apache/ignite/internal/index/IndexManager.java | 202 +--------------------
 .../ignite/internal/index/IndexManagerTest.java    |   2 +-
 .../storage/ItRebalanceDistributedTest.java        |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   4 +-
 .../ignite/internal/schema/SchemaManager.java      |  57 ++----
 .../internal/sql/engine/SqlQueryProcessor.java     |  39 ++--
 .../sql/engine/exec/ExecutionServiceImpl.java      |   7 +-
 .../sql/engine/schema/CatalogSqlSchemaManager.java |  11 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   4 +-
 .../internal/table/distributed/TableManager.java   |  28 +--
 .../table/distributed/TableManagerTest.java        |   2 +-
 13 files changed, 82 insertions(+), 283 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 82c61d276e..880e2e2767 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -42,12 +42,14 @@ public interface CatalogService {
 
     CatalogTableDescriptor table(String tableName, long timestamp);
 
+    @Deprecated
     CatalogTableDescriptor table(int tableId, long timestamp);
 
     CatalogTableDescriptor table(int tableId, int catalogVersion);
 
     CatalogIndexDescriptor index(String indexName, long timestamp);
 
+    @Deprecated
     CatalogIndexDescriptor index(int indexId, long timestamp);
 
     CatalogSchemaDescriptor schema(int version);
@@ -56,6 +58,7 @@ public interface CatalogService {
 
     CatalogZoneDescriptor zone(String zoneName, long timestamp);
 
+    @Deprecated
     CatalogZoneDescriptor zone(int zoneId, long timestamp);
 
     CatalogZoneDescriptor zone(int zoneId, int version);
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 97d94f32bf..e4a1c47af4 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.AbstractIndexCommandParams;
@@ -55,13 +54,6 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.schema.configuration.TableView;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexChange;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexChange;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexChange;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
@@ -69,15 +61,8 @@ import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.StringUtils;
-import org.apache.ignite.lang.ErrorGroups;
-import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
-import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.lang.TableNotFoundException;
 
 /**
  * An Ignite component that is responsible for handling index-related commands like CREATE or DROP
@@ -87,9 +72,6 @@ import org.apache.ignite.lang.TableNotFoundException;
 public class IndexManager extends Producer<IndexEvent, IndexEventParameters> implements IgniteComponent {
     private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class);
 
-    /** Common tables and indexes configuration. */
-    private final TablesConfiguration tablesCfg;
-
     /** Schema manager. */
     private final SchemaManager schemaManager;
 
@@ -108,18 +90,15 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
     /**
      * Constructor.
      *
-     * @param tablesCfg Tables and indexes configuration.
      * @param catalogManager Catalog manager.
      * @param schemaManager Schema manager.
      * @param tableManager Table manager.
      */
     public IndexManager(
-            TablesConfiguration tablesCfg,
             CatalogManager catalogManager,
             SchemaManager schemaManager,
             TableManager tableManager
     ) {
-        this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
         this.schemaManager = Objects.requireNonNull(schemaManager, "schemaManager");
         this.tableManager = tableManager;
         this.catalogManager = catalogManager;
@@ -168,115 +147,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
                     : catalogManager.createIndex((CreateHashIndexParams) params);
 
             return indexCreateFuture.thenApply(ignore -> catalogManager.index(params.indexName(), Long.MAX_VALUE))
-                    .thenCompose(index -> createIndexInternal(
-                            index.id(),
-                            params.schemaName(),
-                            params.indexName(),
-                            params.tableName(),
-                            true,
-                            createIndexChanger(params)
-                    ));
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    @Deprecated(forRemoval = true)
-    private Consumer<TableIndexChange> createIndexChanger(AbstractIndexCommandParams indexParams) {
-        if (indexParams instanceof CreateSortedIndexParams) {
-            CreateSortedIndexParams params = (CreateSortedIndexParams) indexParams;
-            return tableIndexChange -> tableIndexChange.convert(SortedIndexChange.class).changeColumns(colsInit -> {
-                for (int i = 0; i < params.columns().size(); i++) {
-                    String columnName = params.columns().get(i);
-                    CatalogColumnCollation collation = params.collations().get(i);
-                    //TODO: https://issues.apache.org/jira/browse/IGNITE-17563 Pass null ordering for columns.
-                    colsInit.create(columnName, colInit -> colInit.changeAsc(collation.asc()));
-                }
-            }).changeUniq(params.unique());
-        } else {
-            CreateHashIndexParams params = (CreateHashIndexParams) indexParams;
-            return tableIndexChange -> tableIndexChange.convert(HashIndexChange.class)
-                    .changeColumnNames(params.columns().toArray(STRING_EMPTY_ARRAY))
-                    .changeUniq(params.unique());
-        }
-    }
-
-    @Deprecated(forRemoval = true)
-    private CompletableFuture<Boolean> createIndexInternal(
-            int indexId,
-            String schemaName,
-            String indexName,
-            String tableName,
-            boolean failIfExists,
-            Consumer<TableIndexChange> indexChange
-    ) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
-
-        LOG.debug("Going to create index [schema={}, table={}, index={}]", schemaName, tableName, indexName);
-
-        try {
-            validateName(indexName);
-
-            CompletableFuture<Boolean> future = new CompletableFuture<>();
-
-            // Check index existence flag, avoid usage of hasCause + IndexAlreadyExistsException.
-            AtomicBoolean idxExist = new AtomicBoolean(false);
-
-            tablesCfg.change(tablesChange -> tablesChange.changeIndexes(indexListChange -> {
-                idxExist.set(false);
-
-                if (indexListChange.get(indexName) != null) {
-                    idxExist.set(true);
-
-                    throw new IndexAlreadyExistsException(schemaName, indexName);
-                }
-
-                TableView tableCfg = tablesChange.tables().get(tableName);
-
-                if (tableCfg == null) {
-                    throw new TableNotFoundException(schemaName, tableName);
-                }
-
-                int tableId = tableCfg.id();
-
-                tablesChange.changeGlobalIdCounter(indexId);
-
-                Consumer<TableIndexChange> chg = indexChange.andThen(c -> c.changeTableId(tableId).changeId(indexId));
-
-                indexListChange.create(indexName, chg);
-            })).whenComplete((index, th) -> {
-                if (th != null) {
-                    LOG.debug("Unable to create index [schema={}, table={}, index={}]",
-                            th, schemaName, tableName, indexName);
-
-                    if (!failIfExists && idxExist.get()) {
-                        future.complete(false);
-                    } else {
-                        future.completeExceptionally(th);
-                    }
-                } else {
-                    TableIndexConfiguration idxCfg = tablesCfg.indexes().get(indexName);
-
-                    if (idxCfg != null && idxCfg.value() != null) {
-                        LOG.info("Index created [schema={}, table={}, index={}, indexId={}]",
-                                schemaName, tableName, indexName, idxCfg.id().value());
-
-                        future.complete(true);
-                    } else {
-                        var exception = new IgniteInternalException(
-                                Common.INTERNAL_ERR, "Looks like the index was concurrently deleted");
-
-                        LOG.info("Unable to create index [schema={}, table={}, index={}]",
-                                exception, schemaName, tableName, indexName);
-
-                        future.completeExceptionally(exception);
-                    }
-                }
-            });
-
-            return future;
+                    .thenApply(ignore -> false);
         } finally {
             busyLock.leaveBusy();
         }
@@ -291,75 +162,12 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         }
         try {
             return catalogManager.dropIndex(params)
-                    .thenCompose(ignore -> dropIndexAsyncInternal(params.schemaName(), params.indexName(), true));
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    @Deprecated(forRemoval = true)
-    private CompletableFuture<Boolean> dropIndexAsyncInternal(
-            String schemaName,
-            String indexName,
-            boolean failIfNotExists
-    ) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new NodeStoppingException());
-        }
-
-        LOG.debug("Going to drop index [schema={}, index={}]", schemaName, indexName);
-
-        try {
-            validateName(indexName);
-
-            final CompletableFuture<Boolean> future = new CompletableFuture<>();
-
-            // Check index existence flag, avoid usage of hasCause + IndexAlreadyExistsException.
-            AtomicBoolean idxOrTblNotExist = new AtomicBoolean(false);
-
-            tablesCfg.indexes().change(indexListChange -> {
-                idxOrTblNotExist.set(false);
-
-                TableIndexView idxView = indexListChange.get(indexName);
-
-                if (idxView == null) {
-                    idxOrTblNotExist.set(true);
-
-                    throw new IndexNotFoundException(schemaName, indexName);
-                }
-
-                indexListChange.delete(indexName);
-            }).whenComplete((ignored, th) -> {
-                if (th != null) {
-                    LOG.info("Unable to drop index [schema={}, index={}]", th, schemaName, indexName);
-
-                    if (!failIfNotExists && idxOrTblNotExist.get()) {
-                        future.complete(false);
-                    } else {
-                        future.completeExceptionally(th);
-                    }
-                } else {
-                    LOG.info("Index dropped [schema={}, index={}]", schemaName, indexName);
-
-                    future.complete(true);
-                }
-            });
-
-            return future;
+                    .thenApply(ignore -> false);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private void validateName(String indexName) {
-        if (StringUtils.nullOrEmpty(indexName)) {
-            throw new IgniteInternalException(
-                    ErrorGroups.Index.INVALID_INDEX_DEFINITION_ERR,
-                    "Index name should be at least 1 character long"
-            );
-        }
-    }
-
     /**
      * Callback method is called when index configuration changed and an index was dropped.
      *
@@ -383,7 +191,8 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         try {
             CompletableFuture<?> fireEventFuture = fireEvent(IndexEvent.DROP, new IndexEventParameters(causalityToken, tableId, idxId));
 
-            CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(causalityToken, tableId)
+            catalogManager.table(tableId, evt.catalogVersion());
+            CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(tableId)
                     .thenAccept(table -> {
                         if (table != null) { // in case of DROP TABLE the table will be removed first
                             table.unregisterIndex(idxId);
@@ -490,7 +299,8 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         });
 
         // TODO: investigate why fireEventFuture hangs.
-         return allOf(createIndexFuture, fireEventFuture);
+//         return allOf(createIndexFuture, fireEventFuture);
+         return createIndexFuture;
     }
 
     /**
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 53b2abf6d2..f0e0cb527e 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -120,7 +120,7 @@ public class IndexManagerTest {
         when(catalogManager.index(anyString(), anyLong())).thenReturn(indexDescriptor);
         when(indexDescriptor.id()).thenReturn(1);
 
-        indexManager = new IndexManager(tablesConfig, catalogManager, schManager, tableManagerMock);
+        indexManager = new IndexManager(catalogManager, schManager, tableManagerMock);
         indexManager.start();
 
         assertThat(
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 147731282b..0db166eb30 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -783,7 +783,7 @@ public class ItRebalanceDistributedTest {
                     clockWaiter
             );
 
-            schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager, catalogManager);
+            schemaManager = new SchemaManager(registry, metaStorageManager, catalogManager);
 
             distributionZoneManager = new DistributionZoneManager(
                     zonesCfg,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index e39467f816..1e3141f6c5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -345,7 +345,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
                 clockWaiter
         );
 
-        SchemaManager schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr, catalogManager);
+        SchemaManager schemaManager = new SchemaManager(registry, metaStorageMgr, catalogManager);
 
         DistributionZoneManager distributionZoneManager = new DistributionZoneManager(
                 zonesConfig,
@@ -385,7 +385,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
                 null
         );
 
-        var indexManager = new IndexManager(tablesConfig, catalogManager, schemaManager, tableManager);
+        var indexManager = new IndexManager(catalogManager, schemaManager, tableManager);
 
         SqlQueryProcessor qryEngine = new SqlQueryProcessor(
                 registry,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 66ff6471e0..c177c7da43 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -505,7 +505,7 @@ public class IgniteImpl implements Ignite {
                 () -> schemaSyncConfig.delayDuration().value()
         );
 
-        schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr, catalogManager);
+        schemaManager = new SchemaManager(registry, metaStorageMgr, catalogManager);
 
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
@@ -549,7 +549,7 @@ public class IgniteImpl implements Ignite {
                 distributionZoneManager
         );
 
-        indexManager = new IndexManager(tablesConfig, catalogManager, schemaManager, distributedTblMgr);
+        indexManager = new IndexManager(catalogManager, schemaManager, distributedTblMgr);
 
         qryEngine = new SqlQueryProcessor(
                 registry,
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 25e434ec73..dca6103f27 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -51,8 +51,6 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.event.SchemaEvent;
 import org.apache.ignite.internal.schema.event.SchemaEventParameters;
 import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
@@ -83,9 +81,6 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     /** Prevents double stopping of the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    /** Tables configuration. */
-    private final TablesConfiguration tablesCfg;
-
     /** Versioned store for tables by name. */
     private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>> registriesVv;
 
@@ -98,11 +93,9 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     /** Constructor. */
     public SchemaManager(
             Consumer<LongFunction<CompletableFuture<?>>> registry,
-            TablesConfiguration tablesCfg,
             MetaStorageManager metastorageMgr,
             CatalogManager catalogManager) {
         this.registriesVv = new IncrementalVersionedValue<>(registry, HashMap::new);
-        this.tablesCfg = tablesCfg;
         this.catalogManager = catalogManager;
         this.metastorageMgr = metastorageMgr;
     }
@@ -127,11 +120,10 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
 
         int newSchemaVersion = INITIAL_SCHEMA_VERSION;// evt.catalogVersion();
         int tblId = tableDescriptor.id();
-        String tblName = tableDescriptor.name();
 
         SchemaDescriptor newSchema = CatalogDescriptorUtils.convert(newSchemaVersion, tableDescriptor);
 
-        return onSchemaChange(tblId, tblName, newSchema, evt.causalityToken());
+        return onSchemaChange(tblId, newSchema, evt.causalityToken());
     }
 
     /**
@@ -166,7 +158,8 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
 
             SchemaDescriptor newSchema = CatalogDescriptorUtils.convert(newSchemaVersion, tableDescriptor);
 
-            return onSchemaChange(tableDescriptor.id(), tableDescriptor.name(), newSchema, evt.causalityToken());
+            CompletableFuture<?> fut = onSchemaChange(tableDescriptor.id(), newSchema, evt.causalityToken());
+            return completedFuture(false);
         } catch (Throwable th) {
             th.printStackTrace();
             return failedFuture(th);
@@ -176,7 +169,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     /**
      * Update schema.
      */
-    private CompletableFuture<?> onSchemaChange(int tblId, String tblName, SchemaDescriptor newSchema, long causalityToken) {
+    private CompletableFuture<?> onSchemaChange(int tblId, SchemaDescriptor newSchema, long causalityToken) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
         }
@@ -195,24 +188,15 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
                 return failedFuture(e);
             }
 
-            // Fire event early, because dependent listeners have to register VersionedValues' update futures
-            var eventParams = new SchemaEventParameters(causalityToken, tblId, newSchema);
-
-            fireEvent(SchemaEvent.CREATE, eventParams)
-                    .whenComplete((v, e) -> {
+            return registriesVv.update(causalityToken,
+                    (registries, e) -> inBusyLock(busyLock, () -> {
                         if (e != null) {
-                            LOGGER.warn("Error when processing CREATE event", e);
+                            return failedFuture(new IgniteInternalException(IgniteStringFormatter.format(
+                                    "Cannot create a schema for the table [tblId={}, ver={}]", tblId, newSchema.version()), e)
+                            );
                         }
-                    });
 
-            return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> {
-                if (e != null) {
-                    return failedFuture(new IgniteInternalException(IgniteStringFormatter.format(
-                            "Cannot create a schema for the table [tblId={}, ver={}]", tblId, newSchema.version()), e)
-                    );
-                }
-
-                return registerSchema(registries, tblId, tblName, newSchema);
+                return registerSchema(registries, tblId, newSchema);
             }));
         } finally {
             busyLock.leaveBusy();
@@ -242,7 +226,6 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
      *
      * @param registries Map of schema registries.
      * @param tableId Table id.
-     * @param tableName Table name.
      * @param schema Schema descriptor.
      * @return Future that, when complete, will resolve into an updated map of schema registries
      *     (to be used in {@link IncrementalVersionedValue#update}).
@@ -250,7 +233,6 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     private CompletableFuture<Map<Integer, SchemaRegistryImpl>> registerSchema(
             Map<Integer, SchemaRegistryImpl> registries,
             int tableId,
-            String tableName,
             SchemaDescriptor schema
     ) {
         ByteArray key = schemaWithVerHistKey(tableId, schema.version());
@@ -264,7 +246,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
                     if (reg == null) {
                         Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
 
-                        copy.put(tableId, createSchemaRegistry(tableId, tableName, schema));
+                        copy.put(tableId, createSchemaRegistry(tableId, schema));
 
                         return copy;
                     } else {
@@ -279,13 +261,12 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
      * Create schema registry for the table.
      *
      * @param tableId Table id.
-     * @param tableName Table name.
      * @param initialSchema Initial schema for the registry.
      * @return Schema registry.
      */
-    private SchemaRegistryImpl createSchemaRegistry(int tableId, String tableName, SchemaDescriptor initialSchema) {
+    private SchemaRegistryImpl createSchemaRegistry(int tableId, SchemaDescriptor initialSchema) {
         return new SchemaRegistryImpl(
-                ver -> inBusyLock(busyLock, () -> tableSchema(tableId, tableName, ver)),
+                ver -> inBusyLock(busyLock, () -> tableSchema(tableId, ver)),
                 () -> inBusyLock(busyLock, () -> latestSchemaVersion(tableId)),
                 initialSchema
         );
@@ -298,15 +279,13 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
      * @param schemaVer Schema version.
      * @return Schema descriptor.
      */
-    private CompletableFuture<SchemaDescriptor> tableSchema(int tblId, String tableName, int schemaVer) {
-        TableConfiguration tblCfg = tablesCfg.tables().get(tableName);
-
+    private CompletableFuture<SchemaDescriptor> tableSchema(int tblId, int schemaVer) {
         CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
 
         SchemaRegistry registry = registriesVv.latest().get(tblId);
 
         if (registry.lastSchemaVersion() > schemaVer) {
-            return getSchemaDescriptor(schemaVer, tblCfg);
+            return getSchemaDescriptor(schemaVer, tblId);
         }
 
         CompletionListener<Map<Integer, SchemaRegistryImpl>> schemaListener = (token, regs, e) -> {
@@ -379,11 +358,11 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
      * Gets a schema descriptor from the configuration storage.
      *
      * @param schemaVer Schema version.
-     * @param tblCfg Table configuration.
+     * @param tblId
      * @return Schema descriptor.
      */
-    private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int schemaVer, TableConfiguration tblCfg) {
-        CompletableFuture<Entry> ent = metastorageMgr.get(schemaWithVerHistKey(tblCfg.id().value(), schemaVer));
+    private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int schemaVer, Integer tblId) {
+        CompletableFuture<Entry> ent = metastorageMgr.get(schemaWithVerHistKey(tblId, schemaVer));
 
         return ent.thenApply(e -> SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
     }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index e2dcbdfaa4..5311fae998 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -43,6 +43,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.index.IndexManager;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.sql.engine.schema.CatalogSqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
 import org.apache.ignite.internal.sql.engine.session.Session;
@@ -243,14 +245,15 @@ public class SqlQueryProcessor implements QueryProcessor {
                 msgSrvc
         ));
 
-        SqlSchemaManagerImpl sqlSchemaManager = new SqlSchemaManagerImpl(
-                tableManager,
-                schemaManager,
-                registry,
-                busyLock
-        );
+//        SqlSchemaManagerImpl sqlSchemaManager = new SqlSchemaManagerImpl(
+//                tableManager,
+//                schemaManager,
+//                registry,
+//                busyLock
+//        );
+        sqlSchemaManager = new CatalogSqlSchemaManager(catalogManager, 1000);
 
-        sqlSchemaManager.registerListener(prepareSvc);
+//        sqlSchemaManager.registerListener(prepareSvc);
 
         this.prepareSvc = prepareSvc;
 
@@ -266,7 +269,7 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         var dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry);
 
-        sqlSchemaManager.registerListener(executableTableRegistry);
+//        sqlSchemaManager.registerListener(executableTableRegistry);
 
         var executionSrvc = registerService(ExecutionServiceImpl.create(
                 clusterSrvc.topologyService(),
@@ -285,12 +288,12 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         this.executionSrvc = executionSrvc;
 
-        registerTableListener(TableEvent.CREATE, new TableCreatedListener(sqlSchemaManager));
-        registerTableListener(TableEvent.ALTER, new TableUpdatedListener(sqlSchemaManager));
-        registerTableListener(TableEvent.DROP, new TableDroppedListener(sqlSchemaManager));
+//        registerTableListener(TableEvent.CREATE, new TableCreatedListener(sqlSchemaManager));
+//        registerTableListener(TableEvent.ALTER, new TableUpdatedListener(sqlSchemaManager));
+//        registerTableListener(TableEvent.DROP, new TableDroppedListener(sqlSchemaManager));
 
-        registerIndexListener(IndexEvent.CREATE, new IndexCreatedListener(sqlSchemaManager));
-        registerIndexListener(IndexEvent.DROP, new IndexDroppedListener(sqlSchemaManager));
+//        registerIndexListener(IndexEvent.CREATE, new IndexCreatedListener(sqlSchemaManager));
+//        registerIndexListener(IndexEvent.DROP, new IndexDroppedListener(sqlSchemaManager));
 
         this.sqlSchemaManager = sqlSchemaManager;
 
@@ -423,7 +426,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         AtomicReference<InternalTransaction> tx = new AtomicReference<>();
 
         CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
-                .thenCompose(ignored -> {
+                // TODO: wait for latest catalog.
+                .thenCompose(ignore -> tableManager.tablesAsync())
+                .thenCompose(tables -> CompletableFuture.allOf(tables.stream()
+                        .map(t -> tableManager.tableAsync(t.name())).toArray(CompletableFuture[]::new))
+                ).thenCompose(ignored -> {
                     ParsedResult result = parserService.parse(sql);
 
                     validateParsedStatement(context, outerTx, result, params);
@@ -434,7 +441,9 @@ public class SqlQueryProcessor implements QueryProcessor {
 
                     tx.set(implicitTxRequired ? txManager.begin(!rwOp) : outerTx);
 
-                    SchemaPlus schema = sqlSchemaManager.schema(schemaName);
+                    CatalogSchemaDescriptor activeSchema = catalogManager.activeSchema(tx.get().startTimestamp().longValue());
+
+                    SchemaPlus schema = sqlSchemaManager.schema(schemaName, activeSchema.id());
 
                     if (schema == null) {
                         return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName));
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 4cafab1ead..5f30d9c0dd 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -237,13 +237,13 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         return queryManager.execute(tx, plan);
     }
 
-    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params) {
+    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, int version, Object[] params) {
         return BaseQueryContext.builder()
                 .queryId(queryId)
                 .parameters(params)
                 .frameworkConfig(
                         Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                                .defaultSchema(sqlSchemaManager.schema(schema))
+                                .defaultSchema(sqlSchemaManager.schema(schema, version))
                                 .build()
                 )
                 .logger(LOG)
@@ -425,7 +425,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
 
     private DistributedQueryManager getOrCreateQueryManager(QueryStartRequest msg) {
         return queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
-            BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters());
+            //TODO: change type to int.
+            BaseQueryContext ctx = createQueryContext(key, msg.schema(), (int) msg.schemaVersion(), msg.parameters());
 
             return new DistributedQueryManager(ctx);
         });
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
index ed27b83e91..4f75c4e454 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
@@ -68,10 +68,12 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager {
     }
 
     /** {@inheritDoc} */
+    @Deprecated
     @Override
     public SchemaPlus schema(@Nullable String schema) {
         // Should be removed -schema(name, version) must be used instead
-        throw new UnsupportedOperationException();
+        // throw new UnsupportedOperationException();
+        return activeSchema(schema, catalogManager.activeCatalogVersion(Long.MAX_VALUE));
     }
 
     /** {@inheritDoc} */
@@ -87,7 +89,12 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager {
     @Override
     public IgniteTable tableById(int id) {
         // Should be removed - this method is used to obtain native types from a table.
-        throw new UnsupportedOperationException();
+        //throw new UnsupportedOperationException();
+        CatalogTableDescriptor table = catalogManager.table(id, Long.MAX_VALUE);
+
+        assert table != null;
+
+        return (IgniteTable) activeSchema(DEFAULT_SCHEMA_NAME, Long.MAX_VALUE).getTable(table.name());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 454225c5f8..26a355f767 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -281,7 +281,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         dataStorageManager.start();
 
-        schemaManager = new SchemaManager(revisionUpdater, tblsCfg, msm, catalogManager);
+        schemaManager = new SchemaManager(revisionUpdater, msm, catalogManager);
 
         schemaManager.start();
 
@@ -313,7 +313,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
         tblManager = mockManagers();
 
-        idxManager = new IndexManager(tblsCfg, catalogManager, schemaManager, tblManager);
+        idxManager = new IndexManager(catalogManager, schemaManager, tblManager);
 
         idxManager.start();
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 6d4b35ebc7..a308b4db21 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -46,6 +46,7 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
 import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
 import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
 import static org.apache.ignite.lang.ErrorGroups.Sql.DROP_IDX_COLUMN_CONSTRAINT_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.io.IOException;
@@ -112,7 +113,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -150,8 +150,6 @@ import org.apache.ignite.internal.schema.configuration.TablesChange;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.schema.configuration.storage.DataStorageView;
-import org.apache.ignite.internal.schema.event.SchemaEvent;
-import org.apache.ignite.internal.schema.event.SchemaEventParameters;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
@@ -538,15 +536,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         catalogManager.listen(CatalogEvent.TABLE_DROP,
                 (parameters, exception) -> onTableDelete((DropTableEventParameters) parameters).thenApply(ignore -> false));
 
-        schemaManager.listen(SchemaEvent.CREATE, new EventListener<>() {
-            @Override
-            public CompletableFuture<Boolean> notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable exception) {
-                var eventParameters = new TableEventParameters(parameters.causalityToken(), parameters.tableId());
-
-                return fireEvent(TableEvent.ALTER, eventParameters).thenApply(v -> false);
-            }
-        });
-
         addMessageHandler(clusterService.messagingService());
     }
 
@@ -732,7 +721,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         // Empty assignments might be a valid case if tables are created from within cluster init HOCON
         // configuration, which is not supported now.
-        assert newAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
+        assert newAssignments != null : format("Table [id={}] has empty assignments.", tableId);
 
         int partitions = newAssignments.size();
 
@@ -1293,12 +1282,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         pendingTables.put(tableId, table);
         startedTables.put(tableId, table);
 
-        tablesById(causalityToken).thenAccept(ignored -> inBusyLock(busyLock, () -> {
+        createPartsFut.thenAccept(ignored -> inBusyLock(busyLock, () -> {
             pendingTables.remove(tableId);
         }));
 
-        tablesById(causalityToken)
-                .thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
+        createPartsFut
+                .thenRunAsync(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
 
         // TODO should be reworked in IGNITE-16763
         // We use the event notification future as the result so that dependent components can complete the schema updates.
@@ -1307,7 +1296,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         CompletableFuture<?> eventFut = fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId));
 
         // TODO: investigate why createParts and eventFutures hangs.
-        return allOf(createPartsFut, eventFut);
+//         return allOf(createPartsFut, eventFut);
+        return completedFuture(false);
     }
 
     /**
@@ -1516,6 +1506,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         .changeColocationColumns(colocationKeys0.toArray(String[]::new)));
             };
 
+
             return catalogManager.createTable(parameters)
                     .thenApply(ignore -> catalogManager.table(tableName, Long.MAX_VALUE).id())
                     .thenCompose(tableId -> createTableAsyncInternal(tableId, tableName, zoneName, tblChanger));
@@ -2899,8 +2890,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             }
 
                             if (primaryCols.contains(colName)) {
-                                throw new SqlException(DROP_IDX_COLUMN_CONSTRAINT_ERR, IgniteStringFormatter
-                                        .format("Can`t delete column, belongs to primary key: [name={}]", colName));
+                                throw new SqlException(DROP_IDX_COLUMN_CONSTRAINT_ERR, format("Can`t delete column, belongs to primary key: [name={}]", colName));
                             }
                         }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 8570c7cb6e..7fc676d9b3 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -884,7 +884,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 workDir,
                 msm,
                 catalogManager,
-                sm = new SchemaManager(revisionUpdater, tblsCfg, msm, catalogManager),
+                sm = new SchemaManager(revisionUpdater, msm, catalogManager),
                 budgetView -> new LocalLogStorageFactory(),
                 new HybridClockImpl(),
                 new OutgoingSnapshotsManager(clusterService.messagingService()),


[ignite-3] 01/04: fixup! Switch TableManager to Catalog events.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 61c307717eebac805e81a92d3101c0e4a41bbbaa
Author: amashenkov <an...@gmail.com>
AuthorDate: Fri Jul 14 12:34:45 2023 +0300

    fixup! Switch TableManager to Catalog events.
---
 .../org/apache/ignite/internal/catalog/CatalogService.java    |  2 ++
 .../apache/ignite/internal/catalog/CatalogServiceImpl.java    | 11 +++++++++++
 2 files changed, 13 insertions(+)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index c7364ebea3..82c61d276e 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -58,6 +58,8 @@ public interface CatalogService {
 
     CatalogZoneDescriptor zone(int zoneId, long timestamp);
 
+    CatalogZoneDescriptor zone(int zoneId, int version);
+
     CatalogSchemaDescriptor activeSchema(long timestamp);
 
     CatalogSchemaDescriptor activeSchema(@Nullable String schemaName, long timestamp);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 01e88bec61..3d27ae868a 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -241,6 +241,17 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
         return catalogAt(timestamp).zone(zoneId);
     }
 
+    @Override
+    public CatalogZoneDescriptor zone(int zoneId, int version) {
+        Catalog catalog = catalog(version);
+
+        if (catalog == null) {
+            return null;
+        }
+
+        return catalog.zone(zoneId);
+    }
+
     @Override
     public @Nullable CatalogSchemaDescriptor activeSchema(long timestamp) {
         return catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);