You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/07/27 16:50:33 UTC

[ignite-3] 17/19: wip. Replace direct proxy with waiting for latest catalog version.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 605b8f6b29234288ebeab347ba1b086037041ddb
Author: amashenkov <an...@gmail.com>
AuthorDate: Mon Jul 24 16:35:48 2023 +0300

    wip. Replace direct proxy with waiting for latest catalog version.
---
 .../ignite/internal/catalog/CatalogManager.java    |   8 +
 .../internal/catalog/CatalogManagerImpl.java       |   7 +
 .../ignite/internal/catalog/storage/UpdateLog.java |   5 +
 .../internal/catalog/storage/UpdateLogImpl.java    |   9 +-
 .../apache/ignite/internal/index/IndexManager.java |  10 +-
 .../ignite/internal/schema/SchemaManager.java      |   2 -
 .../sql/engine/exec/UpdatableTableImpl.java        |   2 -
 .../internal/table/distributed/TableManager.java   | 417 +++------------------
 8 files changed, 72 insertions(+), 388 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index 1d91cc35de..351aaccbfa 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -131,4 +131,12 @@ public interface CatalogManager extends IgniteComponent, CatalogService {
      * @return Operation future.
      */
     CompletableFuture<Void> renameDistributionZone(RenameZoneParams params);
+
+    // TODO: replace with safe-time waiting.
+    /**
+     * Waits for latest catalog version.
+     *
+     * @return Operation future.
+     */
+    CompletableFuture<Void> waitLatest();
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index fe30bbcf89..92407146b4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -699,6 +699,13 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam
         throw new SqlException(STMT_VALIDATION_ERR, msg, params);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> waitLatest() {
+        return updateLog.lastUpdateVersion()
+                .thenCompose(versionTracker::waitFor);
+    }
+
     @FunctionalInterface
     interface UpdateProducer {
         List<UpdateEntry> get(Catalog catalog);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index eb17046103..51500d86c5 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -58,6 +58,11 @@ public interface UpdateLog extends IgniteComponent {
      */
     @Override void start() throws IgniteInternalException;
 
+    /**
+     * Returns last update version.
+     */
+    CompletableFuture<Integer> lastUpdateVersion();
+
     /**
      * An interface describing a handler that will receive notification
      * when a new update is added to the log.
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 4d0e00198c..2bfa223e00 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -55,7 +55,7 @@ public class UpdateLogImpl implements UpdateLog {
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    private final MetaStorageManager metastore;
+    public final MetaStorageManager metastore;
 
     private volatile OnUpdateHandler onUpdateHandler;
     private volatile @Nullable UpdateListener listener;
@@ -170,6 +170,13 @@ public class UpdateLogImpl implements UpdateLog {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Integer> lastUpdateVersion() {
+        return metastore.get(CatalogKey.currentVersion())
+                .thenApply(e -> e.value() == null ? 0 : ByteUtils.bytesToInt(e.value()));
+    }
+
     private static class CatalogKey {
         private CatalogKey() {
             throw new AssertionError();
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 4a4853c2b8..c9c23d2318 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.index;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
 
@@ -201,15 +200,13 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         try {
             fireEvent(IndexEvent.DROP, new IndexEventParameters(causalityToken, tableId, idxId));
 
-            CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(evt.causalityToken(), tableId)
+            return tableManager.tablesById(evt.causalityToken())
+                    .thenApply(tables -> tables.get(tableId))
                     .thenAccept(table -> {
                         if (table != null) { // in case of DROP TABLE the table will be removed first
                             table.unregisterIndex(idxId);
                         }
                     });
-
-            // TODO: investigate why DropIndexFuture hangs.
-            return completedFuture(null); // dropIndexFuture;
         } catch (Throwable th) {
             LOG.warn("Failed to process drop index event.", th);
 
@@ -267,9 +264,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
 
         var storageIndexDescriptor = StorageIndexDescriptor.create(tableDescriptor, indexDescriptor);
 
-        CompletableFuture<?> fireEventFuture =
-                fireEvent(IndexEvent.CREATE, new IndexEventParameters(causalityToken, tableId, indexId, eventIndexDescriptor));
-
         TableImpl table = tableManager.getTable(tableId);
 
         assert table != null : tableId;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index af0451b259..f383a8f940 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -70,8 +70,6 @@ import org.jetbrains.annotations.Nullable;
  * The class services a management of table schemas.
  */
 public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters> implements IgniteComponent {
-    private static final IgniteLogger LOGGER = Loggers.forClass(SchemaManager.class);
-
     /** Initial version for schemas. */
     public static final int INITIAL_SCHEMA_VERSION = 1;
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 3f48868249..94503daf75 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -26,7 +26,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -43,7 +42,6 @@ import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ee926b253d..afbc7093be 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -25,14 +25,11 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments;
 import static org.apache.ignite.internal.distributionzones.rebalance.ZoneCatalogDescriptorUtils.toZoneDescriptor;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
-import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.apache.ignite.internal.utils.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
@@ -57,7 +54,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -81,8 +77,6 @@ import java.util.function.LongFunction;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.ignite.configuration.ConfigurationChangeException;
-import org.apache.ignite.configuration.ConfigurationProperty;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.baseline.BaselineManager;
@@ -94,6 +88,7 @@ import org.apache.ignite.internal.catalog.commands.DropTableParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogDataStorageDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -131,17 +126,11 @@ import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.schema.CatalogDescriptorUtils;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableChange;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesChange;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
-import org.apache.ignite.internal.schema.configuration.storage.DataStorageView;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
@@ -189,9 +178,7 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.RebalanceUtil;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.DistributionZoneNotFoundException;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteExceptionUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteSystemProperties;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -203,7 +190,6 @@ import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
 import org.apache.ignite.table.Table;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -375,8 +361,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
     private final IndexBuilder indexBuilder;
 
-    private final ConfiguredTablesCache configuredTablesCache;
-
     /**
      * Creates a new table manager.
      *
@@ -498,8 +482,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), clock, txManager, vaultManager, mvGc);
 
         indexBuilder = new IndexBuilder(nodeName, cpus);
-
-        configuredTablesCache = new ConfiguredTablesCache(tablesCfg, getMetadataLocallyOnly);
     }
 
     @Override
@@ -675,7 +657,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             CatalogTableDescriptor tableDescriptor = catalogManager.table(evt.tableId(), previousCatalogVersion);
             CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(tableDescriptor.zoneId(), previousCatalogVersion);
 
-            dropTableLocally(evt.causalityToken(), tableDescriptor, zoneDescriptor);
+            return dropTableLocally(evt.causalityToken(), tableDescriptor, zoneDescriptor);
         } catch (Throwable th) {
             LOG.warn("Failed to process drop table event.", th);
 
@@ -683,8 +665,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         } finally {
             busyLock.leaveBusy();
         }
-
-        return completedFuture(null);
     }
 
     /**
@@ -1279,15 +1259,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         createPartsFut.thenRunAsync(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
 
-        // TODO should be reworked in IGNITE-16763
-        // We use the event notification future as the result so that dependent components can complete the schema updates.
-
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation.
-        fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId));
-
-        // TODO: investigate why createParts future hangs.
-        // return createPartsFut;
-        return completedFuture(false);
+        return createPartsFut;
     }
 
     /**
@@ -1370,7 +1342,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @param tableDescriptor Catalog table descriptor.
      * @param zoneDescriptor Catalog distributed zone descriptor.
      */
-    private void dropTableLocally(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
+    private CompletableFuture<Void> dropTableLocally(
+            long causalityToken,
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor
+    ) {
         int tableId = tableDescriptor.id();
         int partitions = zoneDescriptor.partitions();
 
@@ -1431,22 +1407,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
             startedTables.remove(tableId);
 
-            fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId))
-                    .whenComplete((v, e) -> {
-                        Set<ByteArray> assignmentKeys = new HashSet<>();
+            Set<ByteArray> assignmentKeys = new HashSet<>();
 
-                        for (int p = 0; p < partitions; p++) {
-                            assignmentKeys.add(stablePartAssignmentsKey(new TablePartitionId(tableId, p)));
-                        }
+            for (int p = 0; p < partitions; p++) {
+                assignmentKeys.add(stablePartAssignmentsKey(new TablePartitionId(tableId, p)));
+            }
 
-                        metaStorageMgr.removeAll(assignmentKeys);
+            metaStorageMgr.removeAll(assignmentKeys);
 
-                        if (e != null) {
-                            LOG.error("Error on " + TableEvent.DROP + " notification", e);
-                        }
-                    });
+            return completedFuture(null);
         } catch (NodeStoppingException e) {
             fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId), e);
+
+            return failedFuture(e);
         }
     }
 
@@ -1474,180 +1447,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
         try {
             String tableName = parameters.tableName();
-            String zoneName = parameters.zone();
-
-            // Copied from DdlCommandHandler
-            Consumer<TableChange> tblChanger = tableChange -> {
-                tableChange.changeColumns(columnsChange -> {
-                    for (var col : parameters.columns()) {
-                        columnsChange.create(col.name(), columnChange -> CatalogDescriptorUtils.convertColumnDefinition(col, columnChange));
-                    }
-                });
-
-                List<String> colocationKeys = nullOrEmpty(parameters.colocationColumns())
-                        ? parameters.primaryKeyColumns()
-                        : parameters.colocationColumns();
-
-                tableChange.changePrimaryKey(pkChange -> pkChange.changeColumns(parameters.primaryKeyColumns().toArray(String[]::new))
-                        .changeColocationColumns(colocationKeys.toArray(String[]::new)));
-            };
 
             return catalogManager.createTable(parameters)
                     .thenApply(ignore -> catalogManager.table(tableName, Long.MAX_VALUE).id())
-                    .thenCompose(tableId -> createTableAsyncInternal(tableId, tableName, zoneName, tblChanger));
+                    .thenApply(tableId -> getTable(tableId));
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    /** See {@link #createTableAsync(CreateTableParams)} for details. */
-    @Deprecated(forRemoval = true)
-    private CompletableFuture<Table> createTableAsyncInternal(
-            int tableId,
-            String name,
-            String zoneName,
-            Consumer<TableChange> tableInitChange
-    ) {
-        CompletableFuture<Table> tblFut = new CompletableFuture<>();
-
-        tableAsyncInternal(name)
-                .handle((tbl, tblEx) -> {
-                    if (tbl != null) {
-                        tblFut.completeExceptionally(new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name));
-                    } else if (tblEx != null) {
-                        tblFut.completeExceptionally(tblEx);
-                    } else {
-                        if (!busyLock.enterBusy()) {
-                            NodeStoppingException nodeStoppingException = new NodeStoppingException();
-
-                            tblFut.completeExceptionally(nodeStoppingException);
-
-                            throw new IgniteException(nodeStoppingException);
-                        }
-
-                        try {
-                            distributionZoneManager.zoneIdAsyncInternal(zoneName).handle((zoneId, zoneIdEx) -> {
-                                if (zoneId == null) {
-                                    tblFut.completeExceptionally(new DistributionZoneNotFoundException(zoneName));
-                                } else if (zoneIdEx != null) {
-                                    tblFut.completeExceptionally(zoneIdEx);
-                                } else {
-                                    if (!busyLock.enterBusy()) {
-                                        NodeStoppingException nodeStoppingException = new NodeStoppingException();
-
-                                        tblFut.completeExceptionally(nodeStoppingException);
-
-                                        throw new IgniteException(nodeStoppingException);
-                                    }
-
-                                    try {
-                                        cmgMgr.logicalTopology()
-                                                .handle((cmgTopology, e) -> {
-                                                    if (e == null) {
-                                                        if (!busyLock.enterBusy()) {
-                                                            NodeStoppingException nodeStoppingException = new NodeStoppingException();
-
-                                                            tblFut.completeExceptionally(nodeStoppingException);
-
-                                                            throw new IgniteException(nodeStoppingException);
-                                                        }
-
-                                                        try {
-                                                            changeTablesConfigurationOnTableCreate(
-                                                                    tableId,
-                                                                    name,
-                                                                    zoneId,
-                                                                    tableInitChange,
-                                                                    tblFut
-                                                            );
-                                                        } finally {
-                                                            busyLock.leaveBusy();
-                                                        }
-                                                    } else {
-                                                        tblFut.completeExceptionally(e);
-                                                    }
-
-                                                    return null;
-                                                });
-                                    } finally {
-                                        busyLock.leaveBusy();
-                                    }
-                                }
-
-                                return null;
-                            });
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
-                    }
-
-                    return null;
-                });
-
-        return tblFut;
-    }
-
-    /**
-     * Creates a new table in {@link TablesConfiguration}.
-     *
-     * @param tableId Table id.
-     * @param name Table name.
-     * @param zoneId Distribution zone id.
-     * @param tableInitChange Table changer.
-     * @param tblFut Future representing pending completion of the table creation.
-     */
-    private void changeTablesConfigurationOnTableCreate(
-            int tableId,
-            String name,
-            int zoneId,
-            Consumer<TableChange> tableInitChange,
-            CompletableFuture<Table> tblFut
-    ) {
-        tablesCfg.change(tablesChange -> {
-            incrementTablesGeneration(tablesChange);
-
-            tablesChange.changeTables(tablesListChange -> {
-                if (tablesListChange.get(name) != null) {
-                    throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name);
-                }
-
-                tablesListChange.create(name, (tableChange) -> {
-                    tableInitChange.accept(tableChange);
-
-                    tableChange.changeZoneId(zoneId);
-
-                    var extConfCh = ((ExtendedTableChange) tableChange);
-
-                    extConfCh.changeId(tableId);
-
-                    tablesChange.changeGlobalIdCounter(tableId);
-
-                    extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
-
-                    tableCreateFuts.put(extConfCh.id(), tblFut);
-                });
-            });
-        }).exceptionally(t -> {
-            Throwable ex = getRootCause(t);
-
-            if (ex instanceof TableAlreadyExistsException) {
-                tblFut.completeExceptionally(ex);
-            } else {
-                LOG.debug("Unable to create table [name={}]", ex, name);
-
-                tblFut.completeExceptionally(ex);
-
-                tableCreateFuts.values().removeIf(fut -> fut == tblFut);
-            }
-
-            return null;
-        });
-    }
-
-    private static void incrementTablesGeneration(TablesChange tablesChange) {
-        tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1);
-    }
-
     /**
      * Alters a cluster table. If an appropriate table does not exist, a future will be completed with {@link TableNotFoundException}.
      *
@@ -1701,84 +1509,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
     }
 
-    /**
-     * Alter table configuration.
-     *
-     * @see AlterTableAddColumnParams
-     * @see AlterTableDropColumnParams
-     */
-    @Deprecated(forRemoval = true)
-    private CompletableFuture<Void> alterTableAsyncInternal(String name, Function<TableChange, Boolean> tableChange) {
-        CompletableFuture<Void> tblFut = new CompletableFuture<>();
-
-        tableAsync(name).thenAccept(tbl -> {
-            if (tbl == null) {
-                tblFut.completeExceptionally(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
-            } else {
-                tablesCfg.tables().change(ch -> {
-                    if (ch.get(name) == null) {
-                        throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
-                    }
-
-                    ch.update(name, tblCh -> {
-                        if (!tableChange.apply(tblCh)) {
-                            return;
-                        }
-
-                        ExtendedTableChange exTblChange = (ExtendedTableChange) tblCh;
-
-                        exTblChange.changeSchemaId(exTblChange.schemaId() + 1);
-                    });
-                }).whenComplete((res, t) -> {
-                    if (t != null) {
-                        Throwable ex = getRootCause(t);
-
-                        if (ex instanceof TableNotFoundException) {
-                            tblFut.completeExceptionally(ex);
-                        } else {
-                            LOG.debug("Unable to modify table [name={}]", ex, name);
-
-                            tblFut.completeExceptionally(ex);
-                        }
-                    } else {
-                        tblFut.complete(res);
-                    }
-                });
-            }
-        }).exceptionally(th -> {
-            tblFut.completeExceptionally(th);
-
-            return null;
-        });
-
-        return tblFut;
-    }
-
-    /**
-     * Gets a cause exception for a client.
-     *
-     * @param t Exception wrapper.
-     * @return A root exception which will be acceptable to throw for public API.
-     */
-    //TODO: IGNITE-16051 Implement exception converter for public API.
-    private @NotNull IgniteException getRootCause(Throwable t) {
-        Throwable ex;
-
-        if (t instanceof CompletionException) {
-            if (t.getCause() instanceof ConfigurationChangeException) {
-                ex = t.getCause().getCause();
-            } else {
-                ex = t.getCause();
-            }
-
-        } else {
-            ex = t;
-        }
-
-        // TODO https://issues.apache.org/jira/browse/IGNITE-19539
-        return (ex instanceof IgniteException) ? (IgniteException) ex : IgniteExceptionUtils.wrap(ex);
-    }
-
     /**
      * Drops a table with the name specified. If appropriate table does not be found, a future will be completed with
      * {@link TableNotFoundException}.
@@ -1792,55 +1522,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             throw new IgniteException(new NodeStoppingException());
         }
         try {
-            return catalogManager.dropTable(params)
-                    .thenCompose(ignore -> dropTableAsyncInternal(params.tableName()));
+            return catalogManager.dropTable(params);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    /** See {@link #dropTableAsync(DropTableParams)} for details. */
-    @Deprecated(forRemoval = true)
-    private CompletableFuture<Void> dropTableAsyncInternal(String name) {
-        return tableAsyncInternal(name).thenCompose(tbl -> {
-            // In case of drop it's an optimization that allows not to fire drop-change-closure if there's no such
-            // distributed table and the local config has lagged behind.
-            if (tbl == null) {
-                return failedFuture(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
-            }
-
-            return tablesCfg
-                    .change(chg -> {
-                        incrementTablesGeneration(chg);
-
-                        chg
-                                .changeTables(tblChg -> {
-                                    if (tblChg.get(name) == null) {
-                                        throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
-                                    }
-
-                                    tblChg.delete(name);
-                                })
-                                .changeIndexes(idxChg -> {
-                                    for (TableIndexView index : idxChg) {
-                                        if (index.tableId() == tbl.tableId()) {
-                                            idxChg.delete(index.name());
-                                        }
-                                    }
-                                });
-                    })
-                    .exceptionally(t -> {
-                        Throwable ex = getRootCause(t);
-
-                        if (!(ex instanceof TableNotFoundException)) {
-                            LOG.debug("Unable to drop table [name={}]", ex, name);
-                        }
-
-                        throw new CompletionException(ex);
-                    });
-        });
-    }
-
     /** {@inheritDoc} */
     @Override
     public List<Table> tables() {
@@ -1866,7 +1553,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return Future representing pending completion of the operation.
      */
     private CompletableFuture<List<Table>> tablesAsyncInternal() {
-        return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor)
+        return inBusyLock(busyLock, this::directTableIds)
                 .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
                     var tableFuts = new CompletableFuture[tableIds.size()];
 
@@ -1897,8 +1584,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      *
      * @return A list of direct table ids.
      */
-    private List<Integer> directTableIds() {
-        return configuredTablesCache.configuredTableIds();
+    private CompletableFuture<List<Integer>> directTableIds() {
+        CompletableFuture<?> res = getMetadataLocallyOnly ? completedFuture(null) : catalogManager.waitLatest();
+
+        return res.thenApplyAsync(ignore -> {
+            CatalogSchemaDescriptor schema = catalogManager.activeSchema(Long.MAX_VALUE);
+
+            if (schema == null) {
+                return List.of();
+            }
+
+            return Arrays.stream(schema.tables()).map(CatalogObjectDescriptor::id).collect(toList());
+        }, ioExecutor);
     }
 
     /**
@@ -1908,18 +1605,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found.
      */
     @Nullable
-    private Integer directTableId(String tblName) {
-        try {
-            TableConfiguration exTblCfg = directProxy(tablesCfg.tables()).get(tblName);
+    private CompletableFuture<Integer> directTableId(String tblName) {
+        CompletableFuture<?> res = getMetadataLocallyOnly ? completedFuture(null) : catalogManager.waitLatest();
 
-            if (exTblCfg == null) {
+        return res.thenApplyAsync(ignore -> {
+            CatalogTableDescriptor table = catalogManager.table(tblName, Long.MAX_VALUE);
+
+            if (table == null) {
                 return null;
             } else {
-                return exTblCfg.id().value();
+                return table.id();
             }
-        } catch (NoSuchElementException e) {
-            return null;
-        }
+        }, ioExecutor);
     }
 
     /**
@@ -1930,7 +1627,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return The future with tables map.
      * @see #assignmentsUpdatedVv
      */
-    private CompletableFuture<Map<Integer, TableImpl>> tablesById(long causalityToken) {
+    public CompletableFuture<Map<Integer, TableImpl>> tablesById(long causalityToken) {
         return assignmentsUpdatedVv.get(causalityToken).thenCompose(v -> tablesByIdVv.get(causalityToken));
     }
 
@@ -2055,7 +1752,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
 
         try {
-            return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor)
+            return inBusyLock(busyLock, () -> directTableId(name))
                     .thenCompose(tableId -> inBusyLock(busyLock, () -> {
                         if (tableId == null) {
                             return completedFuture(null);
@@ -2137,7 +1834,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return True when the table is configured into cluster, false otherwise.
      */
     private boolean isTableConfigured(int id) {
-        return configuredTablesCache.isTableConfigured(id);
+        return catalogManager.table(id, catalogManager.activeCatalogVersion(Long.MAX_VALUE)) != null;
     }
 
     /**
@@ -2506,19 +2203,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         return new PartitionMover(busyLock, () -> internalTable.partitionRaftGroupService(partId));
     }
 
-    /**
-     * Gets a direct accessor for the configuration distributed property. If the metadata access only locally configured the method will
-     * return local property accessor.
-     *
-     * @param property Distributed configuration property to receive direct access.
-     * @param <T> Type of the property accessor.
-     * @return An accessor for distributive property.
-     * @see #getMetadataLocallyOnly
-     */
-    private <T extends ConfigurationProperty<?>> T directProxy(T property) {
-        return getMetadataLocallyOnly ? property : (T) property.directProxy();
-    }
-
     private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
         var peers = new HashSet<String>();
         var learners = new HashSet<String>();
@@ -2766,21 +2450,4 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         return zone.replicas();
     }
-
-    @Deprecated(forRemoval = true)
-    private static CatalogDataStorageDescriptor toDataStorageDescriptor(DataStorageView config) {
-        String dataRegion;
-
-        try {
-            Method dataRegionMethod = config.getClass().getMethod("dataRegion");
-
-            dataRegionMethod.setAccessible(true);
-
-            dataRegion = (String) dataRegionMethod.invoke(config);
-        } catch (ReflectiveOperationException e) {
-            dataRegion = e.getMessage();
-        }
-
-        return new CatalogDataStorageDescriptor(config.name(), dataRegion);
-    }
 }