You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/07/25 10:00:25 UTC

[ignite-3] 05/11: wip. Minors after review.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19499
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit cbfe31714e81093730d736c5b3abcd9889cde54f
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Jul 20 17:42:44 2023 +0300

    wip. Minors after review.
---
 .../internal/catalog/CatalogServiceImpl.java       |  2 +-
 .../catalog/commands/CreateTableParams.java        | 10 +++---
 .../distributionzones/DistributionZoneManager.java |  2 +-
 .../apache/ignite/internal/index/IndexManager.java |  8 ++---
 .../ignite/internal/index/ItIndexManagerTest.java  |  6 ++--
 .../ignite/internal/schema/SchemaManager.java      |  3 +-
 .../internal/table/distributed/TableManager.java   | 36 ++++++++++------------
 7 files changed, 33 insertions(+), 34 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index b4fbb777f8..f8d6d50cb7 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -290,7 +290,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
             validateCreateTableParams(params);
 
-            CatalogZoneDescriptor zone = getZone(catalog, Objects.requireNonNullElse(params.zone(), DEFAULT_ZONE_NAME));
+            CatalogZoneDescriptor zone = getZone(catalog, params.zone());
 
             int id = catalog.objectIdGenState();
 
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
index 525219ae2a..884b8090f4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableParams.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.catalog.commands;
 
 import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,8 +43,7 @@ public class CreateTableParams extends AbstractTableCommandParams {
     private List<ColumnParams> cols;
 
     /** Distribution zone name. */
-    @Nullable
-    private String zone;
+    private String zone = CatalogService.DEFAULT_ZONE_NAME;
 
     private CreateTableParams() {
     }
@@ -72,7 +73,6 @@ public class CreateTableParams extends AbstractTableCommandParams {
     /**
      * Gets zone name.
      */
-    @Nullable
     public String zone() {
         return zone;
     }
@@ -126,8 +126,8 @@ public class CreateTableParams extends AbstractTableCommandParams {
          * @param zoneName Zone name.
          * @return {@code this}.
          */
-        public Builder zone(@Nullable String zoneName) {
-            params.zone = zoneName;
+        public Builder zone(String zoneName) {
+            params.zone = Objects.requireNonNullElse(zoneName, CatalogService.DEFAULT_ZONE_NAME);
 
             return this;
         }
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 8f67682499..3e3df85e39 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -191,7 +191,7 @@ public class DistributionZoneManager implements IgniteComponent {
     /** Tables configuration. */
     private final TablesConfiguration tablesConfiguration;
 
-    /** Catalog service. */
+    /** Catalog manager. */
     private final CatalogManager catalogManager;
 
     /** Meta Storage manager. */
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 89fee4b074..7e5caf9491 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
 
@@ -199,10 +200,9 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
         }
 
         try {
-            CompletableFuture<?> fireEventFuture = fireEvent(IndexEvent.DROP, new IndexEventParameters(causalityToken, tableId, idxId));
+            fireEvent(IndexEvent.DROP, new IndexEventParameters(causalityToken, tableId, idxId));
 
-            catalogManager.table(tableId, evt.catalogVersion());
-            CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(tableId)
+            CompletableFuture<?> dropIndexFuture = tableManager.tableAsync(evt.causalityToken(), tableId)
                     .thenAccept(table -> {
                         if (table != null) { // in case of DROP TABLE the table will be removed first
                             table.unregisterIndex(idxId);
@@ -210,7 +210,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
                     });
 
             // TODO: investigate why DropIndexFuture hangs.
-            return allOf(fireEventFuture, dropIndexFuture);
+            return completedFuture(null); // dropIndexFuture;
         } catch (Throwable th) {
             LOG.warn("Failed to process drop index event.", th);
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index 242c4fbdca..13350c0bd6 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.commands.CreateHashIndexParams;
 import org.apache.ignite.internal.catalog.commands.DropIndexParams;
 import org.apache.ignite.internal.index.event.IndexEvent;
@@ -72,7 +73,7 @@ public class ItIndexManagerTest extends ClusterPerClassIntegrationTest {
 
         await(indexManager.createIndexAsync(
                 CreateHashIndexParams.builder()
-                        .schemaName("PUBLIC")
+                        .schemaName(CatalogService.DEFAULT_SCHEMA_NAME)
                         .indexName("INAME")
                         .tableName("TNAME")
                         .columns(List.of("C3", "C2"))
@@ -93,7 +94,8 @@ public class ItIndexManagerTest extends ClusterPerClassIntegrationTest {
 
         CompletableFuture<IndexEventParameters> indexDroppedFuture = registerListener(indexManager, IndexEvent.DROP);
 
-        await(indexManager.dropIndexAsync(DropIndexParams.builder().schemaName("PUBLIC").indexName("INAME").build()));
+        await(indexManager.dropIndexAsync(DropIndexParams.builder()
+                .schemaName(CatalogService.DEFAULT_SCHEMA_NAME).indexName("INAME").build()));
 
         {
             IndexEventParameters params = await(indexDroppedFuture);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 2eac385ff1..d3597ed730 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -94,7 +94,8 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     public SchemaManager(
             Consumer<LongFunction<CompletableFuture<?>>> registry,
             MetaStorageManager metastorageMgr,
-            CatalogManager catalogManager) {
+            CatalogManager catalogManager
+    ) {
         this.registriesVv = new IncrementalVersionedValue<>(registry, HashMap::new);
         this.catalogManager = catalogManager;
         this.metastorageMgr = metastorageMgr;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 697b7933bf..5c00af1fc3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -26,7 +26,6 @@ import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
 import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
@@ -98,6 +97,7 @@ import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.commands.CreateTableParams;
 import org.apache.ignite.internal.catalog.commands.DropTableParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogDataStorageDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -614,10 +614,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
             assert !assignments.isEmpty() : "Couldn't create the table with empty assignments.";
 
+            int[] indexesToWait = Arrays.stream(catalogManager.schema(evt.catalogVersion()).indexes())
+                    .filter(i -> i.tableId() == tableId)
+                    .mapToInt(CatalogObjectDescriptor::id)
+                    .toArray();
+
             CompletableFuture<?> createTableFut = createTableLocally(
                     evt.causalityToken(),
                     tableDescriptor,
                     zoneDescriptor,
+                    indexesToWait,
                     assignments
             ).whenComplete((v, e) -> {
                 if (e == null) {
@@ -1231,12 +1237,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @param causalityToken Causality token.
      * @param tableDescriptor Catalog table descriptor.
      * @param zoneDescriptor Catalog distributed zone descriptor.
+     * @param indexesToWait
      * @return Future that will be completed when local changes related to the table creation are applied.
      */
     private CompletableFuture<?> createTableLocally(
             long causalityToken,
             CatalogTableDescriptor tableDescriptor,
             CatalogZoneDescriptor zoneDescriptor,
+            int[] indexesToWait,
             List<Set<Assignment>> assignments
     ) {
         String tableName = tableDescriptor.name();
@@ -1257,7 +1265,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         var table = new TableImpl(internalTable, lockMgr);
 
         // TODO: IGNITE-19082 Need another way to wait for indexes
-        table.addIndexesToWait(collectTableIndexIds(tableId));
+//        table.addIndexesToWait(indexesToWait);
 
         tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
             if (e != null) {
@@ -1294,7 +1302,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation.
         fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId));
 
-        // TODO: investigate why createParts hangs.
+        // TODO: investigate why createParts future hangs.
         return completedFuture(false);
     }
 
@@ -1482,7 +1490,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
         try {
             String tableName = parameters.tableName();
-            String zoneName = Objects.requireNonNullElse(parameters.zone(), DEFAULT_ZONE_NAME);
+            String zoneName = parameters.zone();
 
             // Copied from DdlCommandHandler
             Consumer<TableChange> tblChanger = tableChange -> {
@@ -1492,19 +1500,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                     }
                 });
 
-                var colocationKeys = parameters.colocationColumns();
-
-                if (nullOrEmpty(colocationKeys)) {
-                    colocationKeys = parameters.primaryKeyColumns();
-                }
-
-                var colocationKeys0 = colocationKeys;
+                List<String> colocationKeys = nullOrEmpty(parameters.colocationColumns())
+                        ? parameters.primaryKeyColumns()
+                        : parameters.colocationColumns();
 
                 tableChange.changePrimaryKey(pkChange -> pkChange.changeColumns(parameters.primaryKeyColumns().toArray(String[]::new))
-                        .changeColocationColumns(colocationKeys0.toArray(String[]::new)));
+                        .changeColocationColumns(colocationKeys.toArray(String[]::new)));
             };
 
-
             return catalogManager.createTable(parameters)
                     .thenApply(ignore -> catalogManager.table(tableName, Long.MAX_VALUE).id())
                     .thenCompose(tableId -> createTableAsyncInternal(tableId, tableName, zoneName, tblChanger));
@@ -2721,13 +2724,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
     }
 
-    private int[] collectTableIndexIds(int tableId) {
-        return tablesCfg.value().indexes().stream()
-                .filter(tableIndexView -> tableIndexView.tableId() == tableId)
-                .mapToInt(TableIndexView::id)
-                .toArray();
-    }
-
     private static void closePartitionTrackers(InternalTable internalTable, int partitionId) {
         closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));