You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/02/21 16:30:45 UTC

[GitHub] [ignite-3] denis-chudov commented on a change in pull request #678: IGNITE-16377 Notification listeners of TableManager

denis-chudov commented on a change in pull request #678:
URL: https://github.com/apache/ignite-3/pull/678#discussion_r811053655



##########
File path: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
##########
@@ -156,6 +156,10 @@ public synchronized void onTableCreated(
     ) {
         IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
 
+        if (table == null || table.schemaView() == null) {
+            System.out.println("");
+        }
+

Review comment:
       seems that this is not needed

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -1147,7 +1236,9 @@ public TableImpl table(UUID id) throws NodeStoppingException {
             return CompletableFuture.completedFuture(null);
         }
 
-        var tbl = tablesById.get(id);
+        Map<UUID, TableImpl> tablesById = tablesByIdVv.get().join();

Review comment:
       I am not sure that such calls are NPE-safe.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -540,16 +575,52 @@ private void createTableLocally(
                         schemaRegistry
                 );
 
-                tables.put(name, table);
-                tablesById.put(tblId, table);
+                tablesVv.update(causalityToken, previous -> {
+                    var val = previous == null ? new HashMap() : new HashMap<>(previous);
+
+                    val.put(name, table);
+
+                    return val;
+                }, th -> {
+                    throw new IgniteInternalException(IgniteStringFormatter.format("Cannot create a table [name={}, id={}]", name, tblId),
+                            th);
+                });
+
+                tablesByIdVv.update(causalityToken, previous -> {
+                    var val = previous == null ? new HashMap() : new HashMap<>(previous);
+
+                    val.put(tblId, table);
+
+                    return val;
+                }, th -> {
+                    throw new IgniteInternalException(IgniteStringFormatter.format("Cannot create a table [name={}, id={}]", name, tblId),
+                            th);
+                });
 
-                fireEvent(TableEvent.CREATE, new TableEventParameters(table), null);
+                completeApiCreateFuture(table);

Review comment:
       the signature of `update` allows async completion, maybe we should rewrite these updates and final call of ` completeApiCreateFuture` via `thenCombine`? This is also about `dropTableLocally` .

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -540,16 +575,52 @@ private void createTableLocally(
                         schemaRegistry
                 );
 
-                tables.put(name, table);
-                tablesById.put(tblId, table);
+                tablesVv.update(causalityToken, previous -> {
+                    var val = previous == null ? new HashMap() : new HashMap<>(previous);
+
+                    val.put(name, table);
+
+                    return val;
+                }, th -> {
+                    throw new IgniteInternalException(IgniteStringFormatter.format("Cannot create a table [name={}, id={}]", name, tblId),
+                            th);
+                });
+
+                tablesByIdVv.update(causalityToken, previous -> {
+                    var val = previous == null ? new HashMap() : new HashMap<>(previous);
+
+                    val.put(tblId, table);
+
+                    return val;
+                }, th -> {
+                    throw new IgniteInternalException(IgniteStringFormatter.format("Cannot create a table [name={}, id={}]", name, tblId),
+                            th);
+                });
 
-                fireEvent(TableEvent.CREATE, new TableEventParameters(table), null);
+                completeApiCreateFuture(table);
+
+                fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, table), null);
             } catch (Exception e) {
-                fireEvent(TableEvent.CREATE, new TableEventParameters(tblId, name), e);
+                fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tblId, name), e);
             }
         }).join();
     }
 
+    /**
+     * Completes appropriate future to return result from API {@link TableManager#createTable(String, Consumer)}.
+     *
+     * @param table Table.
+     */
+    private void completeApiCreateFuture(TableImpl table) {
+        CompletableFuture<Table> tblFut = tableCreateFuts.get(table.tableId());
+
+        if (tblFut != null) {
+            tblFut.complete(table);
+
+            tableCreateFuts.values().removeIf(fut -> fut == tblFut);
+        }
+    }

Review comment:
       what about using `tableCreateFuts.remove` to get the future?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -235,41 +244,52 @@ private void onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx
                                 .listenElements(new ConfigurationNamedListListener<>() {
                                     @Override
                                     public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+                                        long causalityToken = schemasCtx.storageRevision();
+
                                         if (!busyLock.enterBusy()) {
                                             fireEvent(
                                                     TableEvent.ALTER,
-                                                    new TableEventParameters(tblId, tblName),
+                                                    new TableEventParameters(schemasCtx.storageRevision(), tblId, tblName),

Review comment:
       pls replace `schemasCtx.storageRevision()` with `causalityToken` that is declared above, so like in other places where `fireEvent` is called.

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -708,50 +797,50 @@ public Table createTable(String name, Consumer<TableChange> tableInitChange) {
                     }
 
                     change.create(name, (ch) -> {
-                                tableInitChange.accept(ch);
-
-                                ((ExtendedTableChange) ch)
-                                        // Affinity assignments calculation.
-                                        .changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
-                                                baselineMgr.nodes(),
-                                                ch.partitions(),
-                                                ch.replicas())))
-                                        // Table schema preparation.
-                                        .changeSchemas(schemasCh -> schemasCh.create(
-                                                String.valueOf(INITIAL_SCHEMA_VERSION),
-                                                schemaCh -> {
-                                                    SchemaDescriptor schemaDesc;
-
-                                                    //TODO IGNITE-15747 Remove try-catch and force configuration
-                                                    // validation here to ensure a valid configuration passed to
-                                                    // prepareSchemaDescriptor() method.
-                                                    try {
-                                                        schemaDesc = SchemaUtils.prepareSchemaDescriptor(
-                                                                ((ExtendedTableView) ch).schemas().size(),
-                                                                ch);
-                                                    } catch (IllegalArgumentException ex) {
-                                                        throw new ConfigurationValidationException(ex.getMessage());
-                                                    }
-
-                                                    schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
-                                                }
-                                        ));
-                            }
-                    );
-                }).whenComplete((res, t) -> {
-                    if (t != null) {
-                        Throwable ex = getRootCause(t);
+                        tableInitChange.accept(ch);
+
+                        var extConfCh = ((ExtendedTableChange) ch);
+
+                        tableCreateFuts.put(extConfCh.id(), tblFut);
+
+                        // Affinity assignments calculation.
+                        extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
+                                        baselineMgr.nodes(),
+                                        ch.partitions(),
+                                        ch.replicas())))
+                                // Table schema preparation.
+                                .changeSchemas(schemasCh -> schemasCh.create(
+                                        String.valueOf(INITIAL_SCHEMA_VERSION),
+                                        schemaCh -> {
+                                            SchemaDescriptor schemaDesc;
+
+                                            //TODO IGNITE-15747 Remove try-catch and force configuration
+                                            // validation here to ensure a valid configuration passed to
+                                            // prepareSchemaDescriptor() method.
+                                            try {
+                                                schemaDesc = SchemaUtils.prepareSchemaDescriptor(
+                                                        ((ExtendedTableView) ch).schemas().size(),
+                                                        ch);
+                                            } catch (IllegalArgumentException ex) {
+                                                throw new ConfigurationValidationException(ex.getMessage());
+                                            }
 
-                        if (ex instanceof TableAlreadyExistsException) {
-                            tblFut.completeExceptionally(ex);
-                        } else {
-                            LOG.error(IgniteStringFormatter.format("Table wasn't created [name={}]", name), ex);
+                                            schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
+                                        }
+                                ));
+                    });
+                }).exceptionally(t -> {
+                    Throwable ex = getRootCause(t);
 
-                            tblFut.completeExceptionally(ex);
-                        }
+                    if (ex instanceof TableAlreadyExistsException) {
+                        tblFut.completeExceptionally(ex);
                     } else {
-                        tblFut.complete(tables.get(name));
+                        LOG.error(IgniteStringFormatter.format("Table wasn't created [name={}]", name), ex);
+
+                        tblFut.completeExceptionally(ex);

Review comment:
       `tblFut` should also be removed from `tableCreateFuts`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org