You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/07/27 16:50:33 UTC
[ignite-3] 17/19: wip. Replace direct proxy with waiting for latest catalog version.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 605b8f6b29234288ebeab347ba1b086037041ddb
Author: amashenkov <an...@gmail.com>
AuthorDate: Mon Jul 24 16:35:48 2023 +0300
wip. Replace direct proxy with waiting for latest catalog version.
---
.../ignite/internal/catalog/CatalogManager.java | 8 +
.../internal/catalog/CatalogManagerImpl.java | 7 +
.../ignite/internal/catalog/storage/UpdateLog.java | 5 +
.../internal/catalog/storage/UpdateLogImpl.java | 9 +-
.../apache/ignite/internal/index/IndexManager.java | 10 +-
.../ignite/internal/schema/SchemaManager.java | 2 -
.../sql/engine/exec/UpdatableTableImpl.java | 2 -
.../internal/table/distributed/TableManager.java | 417 +++------------------
8 files changed, 72 insertions(+), 388 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index 1d91cc35de..351aaccbfa 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -131,4 +131,12 @@ public interface CatalogManager extends IgniteComponent, CatalogService {
* @return Operation future.
*/
CompletableFuture<Void> renameDistributionZone(RenameZoneParams params);
+
+ // TODO: replace with safe-time waiting.
+ /**
+ * Waits for latest catalog version.
+ *
+ * @return Operation future.
+ */
+ CompletableFuture<Void> waitLatest();
}
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index fe30bbcf89..92407146b4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -699,6 +699,13 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam
throw new SqlException(STMT_VALIDATION_ERR, msg, params);
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> waitLatest() {
+ return updateLog.lastUpdateVersion()
+ .thenCompose(versionTracker::waitFor);
+ }
+
@FunctionalInterface
interface UpdateProducer {
List<UpdateEntry> get(Catalog catalog);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index eb17046103..51500d86c5 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -58,6 +58,11 @@ public interface UpdateLog extends IgniteComponent {
*/
@Override void start() throws IgniteInternalException;
+ /**
+ * Returns last update version.
+ */
+ CompletableFuture<Integer> lastUpdateVersion();
+
/**
* An interface describing a handler that will receive notification
* when a new update is added to the log.
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 4d0e00198c..2bfa223e00 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -55,7 +55,7 @@ public class UpdateLogImpl implements UpdateLog {
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean stopGuard = new AtomicBoolean();
- private final MetaStorageManager metastore;
+ public final MetaStorageManager metastore;
private volatile OnUpdateHandler onUpdateHandler;
private volatile @Nullable UpdateListener listener;
@@ -170,6 +170,13 @@ public class UpdateLogImpl implements UpdateLog {
}
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Integer> lastUpdateVersion() {
+ return metastore.get(CatalogKey.currentVersion())
+ .thenApply(e -> e.value() == null ? 0 : ByteUtils.bytesToInt(e.value()));
+ }
+
private static class CatalogKey {
private CatalogKey() {
throw new AssertionError();
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 4a4853c2b8..c9c23d2318 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.index;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
@@ -201,15 +200,13 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
try {
fireEvent(IndexEvent.DROP, new IndexEventParameters(causalityToken, tableId, idxId));
- CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(evt.causalityToken(), tableId)
+ return tableManager.tablesById(evt.causalityToken())
+ .thenApply(tables -> tables.get(tableId))
.thenAccept(table -> {
if (table != null) { // in case of DROP TABLE the table will be removed first
table.unregisterIndex(idxId);
}
});
-
- // TODO: investigate why DropIndexFuture hangs.
- return completedFuture(null); // dropIndexFuture;
} catch (Throwable th) {
LOG.warn("Failed to process drop index event.", th);
@@ -267,9 +264,6 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
var storageIndexDescriptor = StorageIndexDescriptor.create(tableDescriptor, indexDescriptor);
- CompletableFuture<?> fireEventFuture =
- fireEvent(IndexEvent.CREATE, new IndexEventParameters(causalityToken, tableId, indexId, eventIndexDescriptor));
-
TableImpl table = tableManager.getTable(tableId);
assert table != null : tableId;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index af0451b259..f383a8f940 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -70,8 +70,6 @@ import org.jetbrains.annotations.Nullable;
* The class services a management of table schemas.
*/
public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters> implements IgniteComponent {
- private static final IgniteLogger LOGGER = Loggers.forClass(SchemaManager.class);
-
/** Initial version for schemas. */
public static final int INITIAL_SCHEMA_VERSION = 1;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 3f48868249..94503daf75 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -26,7 +26,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -43,7 +42,6 @@ import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
-import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ee926b253d..afbc7093be 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -25,14 +25,11 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.ZoneCatalogDescriptorUtils.toZoneDescriptor;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
-import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.internal.utils.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
@@ -57,7 +54,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -81,8 +77,6 @@ import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.ignite.configuration.ConfigurationChangeException;
-import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.baseline.BaselineManager;
@@ -94,6 +88,7 @@ import org.apache.ignite.internal.catalog.commands.DropTableParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogDataStorageDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -131,17 +126,11 @@ import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.schema.CatalogDescriptorUtils;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.schema.configuration.TableChange;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesChange;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
-import org.apache.ignite.internal.schema.configuration.storage.DataStorageView;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
@@ -189,9 +178,7 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.RebalanceUtil;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.DistributionZoneNotFoundException;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteExceptionUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.lang.NodeStoppingException;
@@ -203,7 +190,6 @@ import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite.table.Table;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -375,8 +361,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private final IndexBuilder indexBuilder;
- private final ConfiguredTablesCache configuredTablesCache;
-
/**
* Creates a new table manager.
*
@@ -498,8 +482,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), clock, txManager, vaultManager, mvGc);
indexBuilder = new IndexBuilder(nodeName, cpus);
-
- configuredTablesCache = new ConfiguredTablesCache(tablesCfg, getMetadataLocallyOnly);
}
@Override
@@ -675,7 +657,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CatalogTableDescriptor tableDescriptor = catalogManager.table(evt.tableId(), previousCatalogVersion);
CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(tableDescriptor.zoneId(), previousCatalogVersion);
- dropTableLocally(evt.causalityToken(), tableDescriptor, zoneDescriptor);
+ return dropTableLocally(evt.causalityToken(), tableDescriptor, zoneDescriptor);
} catch (Throwable th) {
LOG.warn("Failed to process drop table event.", th);
@@ -683,8 +665,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
} finally {
busyLock.leaveBusy();
}
-
- return completedFuture(null);
}
/**
@@ -1279,15 +1259,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
createPartsFut.thenRunAsync(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
- // TODO should be reworked in IGNITE-16763
- // We use the event notification future as the result so that dependent components can complete the schema updates.
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation.
- fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId));
-
- // TODO: investigate why createParts future hangs.
- // return createPartsFut;
- return completedFuture(false);
+ return createPartsFut;
}
/**
@@ -1370,7 +1342,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @param tableDescriptor Catalog table descriptor.
* @param zoneDescriptor Catalog distributed zone descriptor.
*/
- private void dropTableLocally(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
+ private CompletableFuture<Void> dropTableLocally(
+ long causalityToken,
+ CatalogTableDescriptor tableDescriptor,
+ CatalogZoneDescriptor zoneDescriptor
+ ) {
int tableId = tableDescriptor.id();
int partitions = zoneDescriptor.partitions();
@@ -1431,22 +1407,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
startedTables.remove(tableId);
- fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId))
- .whenComplete((v, e) -> {
- Set<ByteArray> assignmentKeys = new HashSet<>();
+ Set<ByteArray> assignmentKeys = new HashSet<>();
- for (int p = 0; p < partitions; p++) {
- assignmentKeys.add(stablePartAssignmentsKey(new TablePartitionId(tableId, p)));
- }
+ for (int p = 0; p < partitions; p++) {
+ assignmentKeys.add(stablePartAssignmentsKey(new TablePartitionId(tableId, p)));
+ }
- metaStorageMgr.removeAll(assignmentKeys);
+ metaStorageMgr.removeAll(assignmentKeys);
- if (e != null) {
- LOG.error("Error on " + TableEvent.DROP + " notification", e);
- }
- });
+ return completedFuture(null);
} catch (NodeStoppingException e) {
fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId), e);
+
+ return failedFuture(e);
}
}
@@ -1474,180 +1447,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
try {
String tableName = parameters.tableName();
- String zoneName = parameters.zone();
-
- // Copied from DdlCommandHandler
- Consumer<TableChange> tblChanger = tableChange -> {
- tableChange.changeColumns(columnsChange -> {
- for (var col : parameters.columns()) {
- columnsChange.create(col.name(), columnChange -> CatalogDescriptorUtils.convertColumnDefinition(col, columnChange));
- }
- });
-
- List<String> colocationKeys = nullOrEmpty(parameters.colocationColumns())
- ? parameters.primaryKeyColumns()
- : parameters.colocationColumns();
-
- tableChange.changePrimaryKey(pkChange -> pkChange.changeColumns(parameters.primaryKeyColumns().toArray(String[]::new))
- .changeColocationColumns(colocationKeys.toArray(String[]::new)));
- };
return catalogManager.createTable(parameters)
.thenApply(ignore -> catalogManager.table(tableName, Long.MAX_VALUE).id())
- .thenCompose(tableId -> createTableAsyncInternal(tableId, tableName, zoneName, tblChanger));
+ .thenApply(tableId -> getTable(tableId));
} finally {
busyLock.leaveBusy();
}
}
- /** See {@link #createTableAsync(CreateTableParams)} for details. */
- @Deprecated(forRemoval = true)
- private CompletableFuture<Table> createTableAsyncInternal(
- int tableId,
- String name,
- String zoneName,
- Consumer<TableChange> tableInitChange
- ) {
- CompletableFuture<Table> tblFut = new CompletableFuture<>();
-
- tableAsyncInternal(name)
- .handle((tbl, tblEx) -> {
- if (tbl != null) {
- tblFut.completeExceptionally(new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name));
- } else if (tblEx != null) {
- tblFut.completeExceptionally(tblEx);
- } else {
- if (!busyLock.enterBusy()) {
- NodeStoppingException nodeStoppingException = new NodeStoppingException();
-
- tblFut.completeExceptionally(nodeStoppingException);
-
- throw new IgniteException(nodeStoppingException);
- }
-
- try {
- distributionZoneManager.zoneIdAsyncInternal(zoneName).handle((zoneId, zoneIdEx) -> {
- if (zoneId == null) {
- tblFut.completeExceptionally(new DistributionZoneNotFoundException(zoneName));
- } else if (zoneIdEx != null) {
- tblFut.completeExceptionally(zoneIdEx);
- } else {
- if (!busyLock.enterBusy()) {
- NodeStoppingException nodeStoppingException = new NodeStoppingException();
-
- tblFut.completeExceptionally(nodeStoppingException);
-
- throw new IgniteException(nodeStoppingException);
- }
-
- try {
- cmgMgr.logicalTopology()
- .handle((cmgTopology, e) -> {
- if (e == null) {
- if (!busyLock.enterBusy()) {
- NodeStoppingException nodeStoppingException = new NodeStoppingException();
-
- tblFut.completeExceptionally(nodeStoppingException);
-
- throw new IgniteException(nodeStoppingException);
- }
-
- try {
- changeTablesConfigurationOnTableCreate(
- tableId,
- name,
- zoneId,
- tableInitChange,
- tblFut
- );
- } finally {
- busyLock.leaveBusy();
- }
- } else {
- tblFut.completeExceptionally(e);
- }
-
- return null;
- });
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- return null;
- });
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- return null;
- });
-
- return tblFut;
- }
-
- /**
- * Creates a new table in {@link TablesConfiguration}.
- *
- * @param tableId Table id.
- * @param name Table name.
- * @param zoneId Distribution zone id.
- * @param tableInitChange Table changer.
- * @param tblFut Future representing pending completion of the table creation.
- */
- private void changeTablesConfigurationOnTableCreate(
- int tableId,
- String name,
- int zoneId,
- Consumer<TableChange> tableInitChange,
- CompletableFuture<Table> tblFut
- ) {
- tablesCfg.change(tablesChange -> {
- incrementTablesGeneration(tablesChange);
-
- tablesChange.changeTables(tablesListChange -> {
- if (tablesListChange.get(name) != null) {
- throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name);
- }
-
- tablesListChange.create(name, (tableChange) -> {
- tableInitChange.accept(tableChange);
-
- tableChange.changeZoneId(zoneId);
-
- var extConfCh = ((ExtendedTableChange) tableChange);
-
- extConfCh.changeId(tableId);
-
- tablesChange.changeGlobalIdCounter(tableId);
-
- extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
-
- tableCreateFuts.put(extConfCh.id(), tblFut);
- });
- });
- }).exceptionally(t -> {
- Throwable ex = getRootCause(t);
-
- if (ex instanceof TableAlreadyExistsException) {
- tblFut.completeExceptionally(ex);
- } else {
- LOG.debug("Unable to create table [name={}]", ex, name);
-
- tblFut.completeExceptionally(ex);
-
- tableCreateFuts.values().removeIf(fut -> fut == tblFut);
- }
-
- return null;
- });
- }
-
- private static void incrementTablesGeneration(TablesChange tablesChange) {
- tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1);
- }
-
/**
* Alters a cluster table. If an appropriate table does not exist, a future will be completed with {@link TableNotFoundException}.
*
@@ -1701,84 +1509,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
}
- /**
- * Alter table configuration.
- *
- * @see AlterTableAddColumnParams
- * @see AlterTableDropColumnParams
- */
- @Deprecated(forRemoval = true)
- private CompletableFuture<Void> alterTableAsyncInternal(String name, Function<TableChange, Boolean> tableChange) {
- CompletableFuture<Void> tblFut = new CompletableFuture<>();
-
- tableAsync(name).thenAccept(tbl -> {
- if (tbl == null) {
- tblFut.completeExceptionally(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
- } else {
- tablesCfg.tables().change(ch -> {
- if (ch.get(name) == null) {
- throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
- }
-
- ch.update(name, tblCh -> {
- if (!tableChange.apply(tblCh)) {
- return;
- }
-
- ExtendedTableChange exTblChange = (ExtendedTableChange) tblCh;
-
- exTblChange.changeSchemaId(exTblChange.schemaId() + 1);
- });
- }).whenComplete((res, t) -> {
- if (t != null) {
- Throwable ex = getRootCause(t);
-
- if (ex instanceof TableNotFoundException) {
- tblFut.completeExceptionally(ex);
- } else {
- LOG.debug("Unable to modify table [name={}]", ex, name);
-
- tblFut.completeExceptionally(ex);
- }
- } else {
- tblFut.complete(res);
- }
- });
- }
- }).exceptionally(th -> {
- tblFut.completeExceptionally(th);
-
- return null;
- });
-
- return tblFut;
- }
-
- /**
- * Gets a cause exception for a client.
- *
- * @param t Exception wrapper.
- * @return A root exception which will be acceptable to throw for public API.
- */
- //TODO: IGNITE-16051 Implement exception converter for public API.
- private @NotNull IgniteException getRootCause(Throwable t) {
- Throwable ex;
-
- if (t instanceof CompletionException) {
- if (t.getCause() instanceof ConfigurationChangeException) {
- ex = t.getCause().getCause();
- } else {
- ex = t.getCause();
- }
-
- } else {
- ex = t;
- }
-
- // TODO https://issues.apache.org/jira/browse/IGNITE-19539
- return (ex instanceof IgniteException) ? (IgniteException) ex : IgniteExceptionUtils.wrap(ex);
- }
-
/**
* Drops a table with the name specified. If appropriate table does not be found, a future will be completed with
* {@link TableNotFoundException}.
@@ -1792,55 +1522,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
throw new IgniteException(new NodeStoppingException());
}
try {
- return catalogManager.dropTable(params)
- .thenCompose(ignore -> dropTableAsyncInternal(params.tableName()));
+ return catalogManager.dropTable(params);
} finally {
busyLock.leaveBusy();
}
}
- /** See {@link #dropTableAsync(DropTableParams)} for details. */
- @Deprecated(forRemoval = true)
- private CompletableFuture<Void> dropTableAsyncInternal(String name) {
- return tableAsyncInternal(name).thenCompose(tbl -> {
- // In case of drop it's an optimization that allows not to fire drop-change-closure if there's no such
- // distributed table and the local config has lagged behind.
- if (tbl == null) {
- return failedFuture(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
- }
-
- return tablesCfg
- .change(chg -> {
- incrementTablesGeneration(chg);
-
- chg
- .changeTables(tblChg -> {
- if (tblChg.get(name) == null) {
- throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
- }
-
- tblChg.delete(name);
- })
- .changeIndexes(idxChg -> {
- for (TableIndexView index : idxChg) {
- if (index.tableId() == tbl.tableId()) {
- idxChg.delete(index.name());
- }
- }
- });
- })
- .exceptionally(t -> {
- Throwable ex = getRootCause(t);
-
- if (!(ex instanceof TableNotFoundException)) {
- LOG.debug("Unable to drop table [name={}]", ex, name);
- }
-
- throw new CompletionException(ex);
- });
- });
- }
-
/** {@inheritDoc} */
@Override
public List<Table> tables() {
@@ -1866,7 +1553,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<List<Table>> tablesAsyncInternal() {
- return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor)
+ return inBusyLock(busyLock, this::directTableIds)
.thenCompose(tableIds -> inBusyLock(busyLock, () -> {
var tableFuts = new CompletableFuture[tableIds.size()];
@@ -1897,8 +1584,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
*
* @return A list of direct table ids.
*/
- private List<Integer> directTableIds() {
- return configuredTablesCache.configuredTableIds();
+ private CompletableFuture<List<Integer>> directTableIds() {
+ CompletableFuture<?> res = getMetadataLocallyOnly ? completedFuture(null) : catalogManager.waitLatest();
+
+ return res.thenApplyAsync(ignore -> {
+ CatalogSchemaDescriptor schema = catalogManager.activeSchema(Long.MAX_VALUE);
+
+ if (schema == null) {
+ return List.of();
+ }
+
+ return Arrays.stream(schema.tables()).map(CatalogObjectDescriptor::id).collect(toList());
+ }, ioExecutor);
}
/**
@@ -1908,18 +1605,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found.
*/
@Nullable
- private Integer directTableId(String tblName) {
- try {
- TableConfiguration exTblCfg = directProxy(tablesCfg.tables()).get(tblName);
+ private CompletableFuture<Integer> directTableId(String tblName) {
+ CompletableFuture<?> res = getMetadataLocallyOnly ? completedFuture(null) : catalogManager.waitLatest();
- if (exTblCfg == null) {
+ return res.thenApplyAsync(ignore -> {
+ CatalogTableDescriptor table = catalogManager.table(tblName, Long.MAX_VALUE);
+
+ if (table == null) {
return null;
} else {
- return exTblCfg.id().value();
+ return table.id();
}
- } catch (NoSuchElementException e) {
- return null;
- }
+ }, ioExecutor);
}
/**
@@ -1930,7 +1627,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return The future with tables map.
* @see #assignmentsUpdatedVv
*/
- private CompletableFuture<Map<Integer, TableImpl>> tablesById(long causalityToken) {
+ public CompletableFuture<Map<Integer, TableImpl>> tablesById(long causalityToken) {
return assignmentsUpdatedVv.get(causalityToken).thenCompose(v -> tablesByIdVv.get(causalityToken));
}
@@ -2055,7 +1752,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
try {
- return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor)
+ return inBusyLock(busyLock, () -> directTableId(name))
.thenCompose(tableId -> inBusyLock(busyLock, () -> {
if (tableId == null) {
return completedFuture(null);
@@ -2137,7 +1834,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return True when the table is configured into cluster, false otherwise.
*/
private boolean isTableConfigured(int id) {
- return configuredTablesCache.isTableConfigured(id);
+ return catalogManager.table(id, catalogManager.activeCatalogVersion(Long.MAX_VALUE)) != null;
}
/**
@@ -2506,19 +2203,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return new PartitionMover(busyLock, () -> internalTable.partitionRaftGroupService(partId));
}
- /**
- * Gets a direct accessor for the configuration distributed property. If the metadata access only locally configured the method will
- * return local property accessor.
- *
- * @param property Distributed configuration property to receive direct access.
- * @param <T> Type of the property accessor.
- * @return An accessor for distributive property.
- * @see #getMetadataLocallyOnly
- */
- private <T extends ConfigurationProperty<?>> T directProxy(T property) {
- return getMetadataLocallyOnly ? property : (T) property.directProxy();
- }
-
private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
var peers = new HashSet<String>();
var learners = new HashSet<String>();
@@ -2766,21 +2450,4 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return zone.replicas();
}
-
- @Deprecated(forRemoval = true)
- private static CatalogDataStorageDescriptor toDataStorageDescriptor(DataStorageView config) {
- String dataRegion;
-
- try {
- Method dataRegionMethod = config.getClass().getMethod("dataRegion");
-
- dataRegionMethod.setAccessible(true);
-
- dataRegion = (String) dataRegionMethod.invoke(config);
- } catch (ReflectiveOperationException e) {
- dataRegion = e.getMessage();
- }
-
- return new CatalogDataStorageDescriptor(config.name(), dataRegion);
- }
}