You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/07/25 10:00:25 UTC
[ignite-3] 05/11: wip. Minors after review.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit cbfe31714e81093730d736c5b3abcd9889cde54f
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Jul 20 17:42:44 2023 +0300
wip. Minors after review.
---
.../internal/catalog/CatalogServiceImpl.java | 2 +-
.../catalog/commands/CreateTableParams.java | 10 +++---
.../distributionzones/DistributionZoneManager.java | 2 +-
.../apache/ignite/internal/index/IndexManager.java | 8 ++---
.../ignite/internal/index/ItIndexManagerTest.java | 6 ++--
.../ignite/internal/schema/SchemaManager.java | 3 +-
.../internal/table/distributed/TableManager.java | 36 ++++++++++------------
7 files changed, 33 insertions(+), 34 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index b4fbb777f8..f8d6d50cb7 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -290,7 +290,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
validateCreateTableParams(params);
- CatalogZoneDescriptor zone = getZone(catalog, Objects.requireNonNullElse(params.zone(), DEFAULT_ZONE_NAME));
+ CatalogZoneDescriptor zone = getZone(catalog, params.zone());
int id = catalog.objectIdGenState();
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
index 525219ae2a..884b8090f4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.catalog.commands;
import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.jetbrains.annotations.Nullable;
/**
@@ -41,8 +43,7 @@ public class CreateTableParams extends AbstractTableCommandParams {
private List<ColumnParams> cols;
/** Distribution zone name. */
- @Nullable
- private String zone;
+ private String zone = CatalogService.DEFAULT_ZONE_NAME;
private CreateTableParams() {
}
@@ -72,7 +73,6 @@ public class CreateTableParams extends AbstractTableCommandParams {
/**
* Gets zone name.
*/
- @Nullable
public String zone() {
return zone;
}
@@ -126,8 +126,8 @@ public class CreateTableParams extends AbstractTableCommandParams {
* @param zoneName Zone name.
* @return {@code this}.
*/
- public Builder zone(@Nullable String zoneName) {
- params.zone = zoneName;
+ public Builder zone(String zoneName) {
+ params.zone = Objects.requireNonNullElse(zoneName, CatalogService.DEFAULT_ZONE_NAME);
return this;
}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 8f67682499..3e3df85e39 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -191,7 +191,7 @@ public class DistributionZoneManager implements IgniteComponent {
/** Tables configuration. */
private final TablesConfiguration tablesConfiguration;
- /** Catalog service. */
+ /** Catalog manager. */
private final CatalogManager catalogManager;
/** Meta Storage manager. */
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 89fee4b074..7e5caf9491 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
@@ -199,10 +200,9 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
}
try {
- CompletableFuture<?> fireEventFuture = fireEvent(IndexEvent.DROP, new IndexEventParameters(causalityToken, tableId, idxId));
+ fireEvent(IndexEvent.DROP, new IndexEventParameters(causalityToken, tableId, idxId));
- catalogManager.table(tableId, evt.catalogVersion());
- CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(tableId)
+ CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(evt.causalityToken(), tableId)
.thenAccept(table -> {
if (table != null) { // in case of DROP TABLE the table will be removed first
table.unregisterIndex(idxId);
@@ -210,7 +210,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
});
// TODO: investigate why DropIndexFuture hangs.
- return allOf(fireEventFuture, dropIndexFuture);
+ return completedFuture(null); // dropIndexFuture;
} catch (Throwable th) {
LOG.warn("Failed to process drop index event.", th);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index 242c4fbdca..13350c0bd6 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.commands.CreateHashIndexParams;
import org.apache.ignite.internal.catalog.commands.DropIndexParams;
import org.apache.ignite.internal.index.event.IndexEvent;
@@ -72,7 +73,7 @@ public class ItIndexManagerTest extends ClusterPerClassIntegrationTest {
await(indexManager.createIndexAsync(
CreateHashIndexParams.builder()
- .schemaName("PUBLIC")
+ .schemaName(CatalogService.DEFAULT_SCHEMA_NAME)
.indexName("INAME")
.tableName("TNAME")
.columns(List.of("C3", "C2"))
@@ -93,7 +94,8 @@ public class ItIndexManagerTest extends ClusterPerClassIntegrationTest {
CompletableFuture<IndexEventParameters> indexDroppedFuture = registerListener(indexManager, IndexEvent.DROP);
- await(indexManager.dropIndexAsync(DropIndexParams.builder().schemaName("PUBLIC").indexName("INAME").build()));
+ await(indexManager.dropIndexAsync(DropIndexParams.builder()
+ .schemaName(CatalogService.DEFAULT_SCHEMA_NAME).indexName("INAME").build()));
{
IndexEventParameters params = await(indexDroppedFuture);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 2eac385ff1..d3597ed730 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -94,7 +94,8 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
public SchemaManager(
Consumer<LongFunction<CompletableFuture<?>>> registry,
MetaStorageManager metastorageMgr,
- CatalogManager catalogManager) {
+ CatalogManager catalogManager
+ ) {
this.registriesVv = new IncrementalVersionedValue<>(registry, HashMap::new);
this.catalogManager = catalogManager;
this.metastorageMgr = metastorageMgr;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 697b7933bf..5c00af1fc3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -26,7 +26,6 @@ import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
@@ -98,6 +97,7 @@ import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.CreateTableParams;
import org.apache.ignite.internal.catalog.commands.DropTableParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogDataStorageDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -614,10 +614,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
assert !assignments.isEmpty() : "Couldn't create the table with empty assignments.";
+ int[] indexesToWait = Arrays.stream(catalogManager.schema(evt.catalogVersion()).indexes())
+ .filter(i -> i.tableId() == tableId)
+ .mapToInt(CatalogObjectDescriptor::id)
+ .toArray();
+
CompletableFuture<?> createTableFut = createTableLocally(
evt.causalityToken(),
tableDescriptor,
zoneDescriptor,
+ indexesToWait,
assignments
).whenComplete((v, e) -> {
if (e == null) {
@@ -1231,12 +1237,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @param causalityToken Causality token.
* @param tableDescriptor Catalog table descriptor.
* @param zoneDescriptor Catalog distributed zone descriptor.
+ * @param indexesToWait
* @return Future that will be completed when local changes related to the table creation are applied.
*/
private CompletableFuture<?> createTableLocally(
long causalityToken,
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor,
+ int[] indexesToWait,
List<Set<Assignment>> assignments
) {
String tableName = tableDescriptor.name();
@@ -1257,7 +1265,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
var table = new TableImpl(internalTable, lockMgr);
// TODO: IGNITE-19082 Need another way to wait for indexes
- table.addIndexesToWait(collectTableIndexIds(tableId));
+// table.addIndexesToWait(indexesToWait);
tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
@@ -1294,7 +1302,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
// TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation.
fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId));
- // TODO: investigate why createParts hangs.
+ // TODO: investigate why createParts future hangs.
return completedFuture(false);
}
@@ -1482,7 +1490,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
try {
String tableName = parameters.tableName();
- String zoneName = Objects.requireNonNullElse(parameters.zone(), DEFAULT_ZONE_NAME);
+ String zoneName = parameters.zone();
// Copied from DdlCommandHandler
Consumer<TableChange> tblChanger = tableChange -> {
@@ -1492,19 +1500,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
});
- var colocationKeys = parameters.colocationColumns();
-
- if (nullOrEmpty(colocationKeys)) {
- colocationKeys = parameters.primaryKeyColumns();
- }
-
- var colocationKeys0 = colocationKeys;
+ List<String> colocationKeys = nullOrEmpty(parameters.colocationColumns())
+ ? parameters.primaryKeyColumns()
+ : parameters.colocationColumns();
tableChange.changePrimaryKey(pkChange -> pkChange.changeColumns(parameters.primaryKeyColumns().toArray(String[]::new))
- .changeColocationColumns(colocationKeys0.toArray(String[]::new)));
+ .changeColocationColumns(colocationKeys.toArray(String[]::new)));
};
-
return catalogManager.createTable(parameters)
.thenApply(ignore -> catalogManager.table(tableName, Long.MAX_VALUE).id())
.thenCompose(tableId -> createTableAsyncInternal(tableId, tableName, zoneName, tblChanger));
@@ -2721,13 +2724,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
}
- private int[] collectTableIndexIds(int tableId) {
- return tablesCfg.value().indexes().stream()
- .filter(tableIndexView -> tableIndexView.tableId() == tableId)
- .mapToInt(TableIndexView::id)
- .toArray();
- }
-
private static void closePartitionTrackers(InternalTable internalTable, int partitionId) {
closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));