You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/10 16:42:09 UTC

[GitHub] [ignite-3] sanpwc opened a new pull request #330: IGNITE-15404

sanpwc opened a new pull request #330:
URL: https://github.com/apache/ignite-3/pull/330


   TBD


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709411689



##########
File path: modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests scenarios for an affinity service. Please pay attention that given test doesn't check Rendezvous or any other
+ * affinity function it just checks {@link AffinityService} logic.
+ */
+public class AffinityServiceTest {
+    /**
+     *
+     */
+    @Test
+    public void testCalculatedAssignmentHappyPath() {
+        List<List<ClusterNode>> assignments = AffinityService.calculateAssignments(
+            Arrays.asList(
+                new ClusterNode(
+                    UUID.randomUUID().toString(), "node0",
+                    new NetworkAddress("localhost", 8080)
+                ),
+                new ClusterNode(
+                    UUID.randomUUID().toString(), "node1",
+                    new NetworkAddress("localhost", 8081)
+                )
+

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707418957



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       I mean that index should be mapped to columns by column_id's that will be a part of internal configuration extension. column_id's will be final, so column renaming won't effect column-to-index mapping. I'll provide IGNITE-15480 description in a few days.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707402281



##########
File path: modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityService.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Stateless affinity service that produces helper methods for an affinity assignments calculation.
+ */
+public class AffinityService {
+    /**
+     * Calculates affinity assignments.
+     *
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
+     * @return List nodes by partition.
+     */
+    public static List<List<ClusterNode>> calculateAssignments(

Review comment:
       What does service actually do?
   Let's make method non-static or rename class to AffinityUtils?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709401399



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed event
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
-            return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
+        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != null :
+                    "Table =[" + ctx.newValue().name() + "] has empty assignments.";
+
+                final IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());

Review comment:
       That's correct, but ctx.newValue() might be null only in case of drop event Wihtin onCreate it'll always be not null (and here we are in onCreate()). It's actually a matter or common ConfigurationNotificationEvent interface: in case of create oldValue is null, in case of drop - new one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708096914



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       Assume there is a row R with schema V1.
   Then you update schema to V2 and add a column C with a default D1.
   Then you update schema to V3 and update default to D2.
   
   Now, you read R, which is stored as of V1 version, and upgrade it to V3. 
   On one side, R doesn't contains C column, and there is no descriptor for column C in V1.
   On another side, you can't use column C descriptor from V3 because it was changed in between.
   Using column C descriptor from V2 will be correct, but you will need to go through schema history for that for every row and every column you read just to check if there is any history.
   
   Column mapper merges intermediate changes once and then get reused.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707483586



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -566,52 +423,84 @@ private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> a
      * {@code false} means the existing table will be returned.
      * @return A table instance.
      */
-    private CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
+    private CompletableFuture<Table> createTableAsync(
+        String name,
+        Consumer<TableChange> tableInitChange,
+        boolean exceptionWhenExist
+    ) {
         CompletableFuture<Table> tblFut = new CompletableFuture<>();
 
-        EventListener<TableEventParameters> clo = new EventListener<>() {
-            @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
-                String tableName = parameters.tableName();
-
-                if (!name.equals(tableName))
-                    return false;
-
-                if (e == null)
-                    tblFut.complete(parameters.table());
-                else
-                    tblFut.completeExceptionally(e);
-
-                return true;
-            }
-
-            @Override public void remove(@NotNull Throwable e) {
-                tblFut.completeExceptionally(e);
-            }
-        };
-
-        listen(TableEvent.CREATE, clo);
-
         tableAsync(name, true).thenAccept(tbl -> {
             if (tbl != null) {
                 if (exceptionWhenExist) {
-                    removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(
-                            LoggerMessageHelper.format("Table already exists [name={}]", name)));
-                } else if (tblFut.complete(tbl))
-                    removeListener(TableEvent.CREATE, clo);
-            } else {
-                try {
-                    clusterCfgMgr
-                        .configurationRegistry()
-                        .getConfiguration(TablesConfiguration.KEY)
-                        .tables()
-                        .change(change -> change.create(name, tableInitChange))
-                        .get();
+                    tblFut.completeExceptionally(new IgniteInternalCheckedException(
+                        LoggerMessageHelper.format("Table already exists [name={}]", name)));
                 }
-                catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Table wasn't created [name=" + name + ']', e);
+                else
+                    tblFut.complete(tbl);
+            }
+            else {
+                IgniteUuid tblId = TABLE_ID_GENERATOR.randomUuid();
 
-                    removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(e));
-                }
+                createTblIntention.put(tblId, new CompletableFuture<>());
+
+                clusterCfgMgr
+                    .configurationRegistry()
+                    .getConfiguration(TablesConfiguration.KEY)
+                    .tables()
+                    .change(
+                        change -> change.create(
+                            name,
+                            (ch) -> {
+                                tableInitChange.accept(ch);
+                                ((ExtendedTableChange)ch).
+                                    // Table id specification.
+                                    changeId(tblId.toString()).
+                                    // Affinity assignments calculation.
+                                    changeAssignments(
+                                        ByteUtils.toBytes(
+                                            AffinityService.calculateAssignments(
+                                                baselineMgr.nodes(),
+                                                ch.partitions(),
+                                                ch.replicas()
+                                            )
+                                        )
+                                    ).
+                                    // Table schema preparation.
+                                    changeSchemas(
+                                        schemasCh -> schemasCh.create(
+                                            String.valueOf(INITIAL_SCHEMA_VERSION),
+                                            schemaCh -> schemaCh.changeSchema(
+                                                ByteUtils.toBytes(
+                                                    SchemaService.prepareSchemaDescriptor(
+                                                        ((ExtendedTableView)ch).schemas().size(),
+                                                        ch
+                                                    )
+                                                )
+                                            )
+                                        )
+                                    );
+                            }
+                        )
+                    )
+                    .thenRun(() -> createTblIntention.get(tblId).thenApply(tblFut::complete)
+                        .thenRun(() -> createTblIntention.remove(tblId))
+                        .exceptionally(throwable -> {
+                            createTblIntention.remove(tblId);
+
+                            tblFut.completeExceptionally(new IgniteException(throwable));
+
+                            return null;
+                        }))
+                    .exceptionally(throwable -> {
+                        LOG.error("Table wasn't created [name=" + name + ']', throwable);

Review comment:
       i think it more helpful to use : LoggerMessageHelper.format instead of all these "]" concatenations, wdyt ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707405472



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       What do you mean?
   The ticket has empty description.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708610725



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed event
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
-            return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
+        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != null :
+                    "Table =[" + ctx.newValue().name() + "] has empty assignments.";
+
+                final IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+                // TODO: IGNITE-15409 Listener with any placeholder should be used instead.
+                ((ExtendedTableConfiguration) clusterCfgMgr.configurationRegistry().
+                    getConfiguration(TablesConfiguration.KEY).tables().get(ctx.newValue().name())).schemas().
+                    listenElements(new ConfigurationNamedListListener<>() {

Review comment:
       If you subscribe to the concrete table, you have to remove this subscription on drop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709414763



##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -82,15 +83,18 @@ public Loza(ClusterService clusterNetSvc, Path dataPath) {
      * @param lsnr Raft group listener.
      * @return Future representing pending completion of the operation.
      */
-    public CompletableFuture<RaftGroupService> prepareRaftGroup(String groupId, List<ClusterNode> nodes, RaftGroupListener lsnr) {
+    public CompletableFuture<RaftGroupService> prepareRaftGroup(
+        String groupId,
+        List<ClusterNode> nodes,
+        Supplier<RaftGroupListener> lsnrSupplier) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707383403



##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -325,6 +326,33 @@ public ClientMessagePacker packUuid(UUID val) {
         return this;
     }
 
+    /**
+     * Writes an {@link IgniteUuid}.
+     *
+     * @param val {@link IgniteUuid} value.
+     * @return This instance.
+     */
+    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {

Review comment:
       and unpackIgniteUuid need to be utility methods, i hope.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707954244



##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -325,6 +326,33 @@ public ClientMessagePacker packUuid(UUID val) {
         return this;
     }
 
+    /**
+     * Writes an {@link IgniteUuid}.
+     *
+     * @param val {@link IgniteUuid} value.
+     * @return This instance.
+     */
+    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {
+        assert !closed : "Packer is closed";
+
+        packExtensionTypeHeader(ClientMsgPackType.IGNITE_UUID, 24);
+
+        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
+        var bytes = new byte[24];
+        ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+        UUID globalId = val.globalId();
+
+        bb.putLong(globalId.getMostSignificantBits());
+        bb.putLong(globalId.getLeastSignificantBits());
+
+        bb.putLong(val.localId());
+
+        writePayload(bytes);
+
+        return this;

Review comment:
       ```suggestion
           ByteBuffer bb = ByteBuffer.allocate(24);
   
           UUID globalId = val.globalId();
   
           bb.putLong(globalId.getMostSignificantBits());
           bb.putLong(globalId.getLeastSignificantBits());
   
           bb.putLong(val.localId());
   
           bb.flip();
   
           writePayload(bb.array());
   ```
   why do we need byte[] wrapping ? more compact ^




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707954244



##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -325,6 +326,33 @@ public ClientMessagePacker packUuid(UUID val) {
         return this;
     }
 
+    /**
+     * Writes an {@link IgniteUuid}.
+     *
+     * @param val {@link IgniteUuid} value.
+     * @return This instance.
+     */
+    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {
+        assert !closed : "Packer is closed";
+
+        packExtensionTypeHeader(ClientMsgPackType.IGNITE_UUID, 24);
+
+        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
+        var bytes = new byte[24];
+        ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+        UUID globalId = val.globalId();
+
+        bb.putLong(globalId.getMostSignificantBits());
+        bb.putLong(globalId.getLeastSignificantBits());
+
+        bb.putLong(val.localId());
+
+        writePayload(bytes);
+
+        return this;

Review comment:
       ```suggestion
           ByteBuffer bb = ByteBuffer.allocate(24);
   
           UUID globalId = val.globalId();
   
           bb.putLong(globalId.getMostSignificantBits());
           bb.putLong(globalId.getLeastSignificantBits());
   
           bb.putLong(val.localId());
   
           bb.flip();
   
           writePayload(bb.array());
   ```
   why do we need byte[] wrapping ? = additional inner object creation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707498223



##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -325,6 +326,33 @@ public ClientMessagePacker packUuid(UUID val) {
         return this;
     }
 
+    /**
+     * Writes an {@link IgniteUuid}.
+     *
+     * @param val {@link IgniteUuid} value.
+     * @return This instance.
+     */
+    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {

Review comment:
       no i mean that it would be helpful to have a public static ByteBuffer packIgniteUuid(IgniteUuid val) method. Or it can`t be reused from some other places ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707450444



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();

Review comment:
       Seems, we can have only one intention for every single table at any time: either to add, drop or alter, but not any combination.
   Maybe we can extend Completable future and enrich it with additional informaion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708371761



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/configuration/schema/ExtendedTableConfigurationSchema.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.schema;
+
+import org.apache.ignite.configuration.annotation.InternalConfiguration;
+import org.apache.ignite.configuration.annotation.NamedConfigValue;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.schemas.table.TableConfigurationSchema;
+import org.apache.ignite.configuration.validation.Immutable;
+
+/**
+ * Extended table configuration schema class.
+ */
+@InternalConfiguration
+public class ExtendedTableConfigurationSchema extends TableConfigurationSchema {
+    /** Table id. String representation of {@link org.apache.ignite.lang.IgniteUuid}. */
+    @Value
+    @Immutable
+    public String id;

Review comment:
       Neither IgniteUuid nor UUID are not supported in configuration.
   For the same reason byte array is used for affinity assignments - neither collections nor array of arrays are supported. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708427305



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -173,63 +273,83 @@ public TableManager(
      * @param name Table name.
      * @param tblId Table id.
      * @param assignment Affinity assignment.
-     * @param schemaReg Schema registry for the table.
      */
     private void createTableLocally(
         String name,
-        UUID tblId,
+        IgniteUuid tblId,
         List<List<ClusterNode>> assignment,
-        SchemaRegistry schemaReg
+        SchemaDescriptor schemaDesc
     ) {
         int partitions = assignment.size();
 
         var partitionsGroupsFutures = new ArrayList<CompletableFuture<RaftGroupService>>();
 
-        Path storageDir = partitionsStoreDir.resolve(name);
+        IntStream.range(0, partitions).forEach(p ->
+            partitionsGroupsFutures.add(
+                raftMgr.prepareRaftGroup(
+                    raftGroupName(tblId, p),
+                    assignment.get(p),
+                    () -> {
+                        Path storageDir = partitionsStoreDir.resolve(name);
+
+                        try {
+                            Files.createDirectories(storageDir);
+                        }
+                        catch (IOException e) {
+                            throw new IgniteInternalException(
+                                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+                                e
+                            );
+                        }
+
+                        return new PartitionListener(
+                            new RocksDbStorage(
+                                storageDir.resolve(String.valueOf(p)),
+                                ByteBuffer::compareTo
+                            )
+                        );
+                    }
+                )
+            )
+        );
 
-        try {
-            Files.createDirectories(storageDir);
-        } catch (IOException e) {
-            throw new IgniteInternalException(
-                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
-                e
-            );
-        }
+        CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
+            try {
+                HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
 
-        for (int p = 0; p < partitions; p++) {
-            RocksDbStorage storage = new RocksDbStorage(
-                storageDir.resolve(String.valueOf(p)),
-                ByteBuffer::compareTo
-            );
-
-            partitionsGroupsFutures.add(raftMgr.prepareRaftGroup(
-                raftGroupName(tblId, p),
-                assignment.get(p),
-                new PartitionListener(storage)
-            ));
-        }
+                for (int p = 0; p < partitions; p++) {
+                    CompletableFuture<RaftGroupService> future = partitionsGroupsFutures.get(p);
 
-        CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
-            HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+                    assert future.isDone();
 
-            for (int p = 0; p < partitions; p++) {
-                CompletableFuture<RaftGroupService> future = partitionsGroupsFutures.get(p);
+                    RaftGroupService service = future.join();
 
-                assert future.isDone();
+                    partitionMap.put(p, service);
+                }
 
-                RaftGroupService service = future.join();
+                InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions);
 
-                partitionMap.put(p, service);
-            }
+                var schemaRegistry = new SchemaRegistryImpl(v -> schemaDesc);
 
-            InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions);
+                schemaRegistry.onSchemaRegistered(schemaDesc);
+
+                var table = new TableImpl(
+                    internalTable,
+                    schemaRegistry,
+                    TableManager.this,
+                    null
+                );
 
-            var table = new TableImpl(internalTable, schemaReg, this, null);
+                tables.put(name, table);
+                tablesById.put(tblId, table);
 
-            tables.put(name, table);
-            tablesById.put(table.tableId(), table);
+                fireEvent(TableEvent.CREATE, new TableEventParameters(table), null);
 
-            onEvent(TableEvent.CREATE, new TableEventParameters(table), null);
+                Optional.ofNullable(createTblIntention.get(tblId)).ifPresent(f -> f.complete(table));
+            }
+            catch (Exception e) {

Review comment:
       Need to fire create table event with the exception here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710841121



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.Optional;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+import org.apache.ignite.lang.LoggerMessageHelper;
+import org.apache.ignite.schema.SchemaTable;
+
+/**
+ * Stateless schema utils that produces helper methods for schema preparation.
+ * <p>
+ * Schemas itself MUST be registered in a version ascending order incrementing by {@code 1} with NO gaps,
+ * otherwise an exception will be thrown. The version numbering starts from the {@code 1}.
+ * <p>
+ * After some table maintenance process some first versions may become outdated and can be safely cleaned up
+ * if the process guarantees the table no longer has a data of these versions.
+ *
+ * @implSpec The changes in between two arbitrary actual versions MUST NOT be lost.
+ * Thus, schema versions can only be removed from the beginning.
+ * @implSpec Initial schema history MAY be registered without the first outdated versions
+ * that could be cleaned up earlier.

Review comment:
       Let's move javadoc to SchemaRegistry.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710371591



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -240,301 +360,38 @@ private void createTableLocally(
      * @param tblId Table id.
      * @param assignment Affinity assignment.
      */
-    private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
-        int partitions = assignment.size();
-
-        for (int p = 0; p < partitions; p++)
-            raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
-
-        TableImpl table = tables.get(name);
-
-        assert table != null : "There is no table with the name specified [name=" + name + ']';
-
-        onEvent(TableEvent.DROP, new TableEventParameters(table), null);
-    }
-
-    /**
-     * Compounds a RAFT group unique name.
-     *
-     * @param tableId Table identifier.
-     * @param partition Number of table partitions.
-     * @return A RAFT group name.
-     */
-    @NotNull private String raftGroupName(UUID tableId, int partition) {
-        return tableId + "_part_" + partition;
-    }
-
-    /**
-     * Table configuration changed callback.
-     *
-     * @param rev Storage revision.
-     * @param oldCfg Old configuration.
-     * @param newCfg New configuration.
-     * @return Operation future.
-     */
-    @NotNull private CompletableFuture<?> onConfigurationChanged(
-        long rev,
-        @Nullable NamedListView<TableView> oldCfg,
-        @Nullable NamedListView<TableView> newCfg
-    ) {
-        Set<String> tablesToStart = (newCfg == null || newCfg.namedListKeys() == null) ?
-            Collections.emptySet() :
-            newCfg.namedListKeys().stream().filter(t -> !oldCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
-
-        Set<String> tablesToStop = (oldCfg == null || oldCfg.namedListKeys() == null) ?
-            Collections.emptySet() :
-            oldCfg.namedListKeys().stream().filter(t -> !newCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        final Set<String> schemaChanged =
-            (oldCfg != null && oldCfg.namedListKeys() != null && newCfg != null && newCfg.namedListKeys() != null) ?
-                oldCfg.namedListKeys().stream()
-                    .filter(tblName -> newCfg.namedListKeys().contains(tblName)) // Filter changed tables.
-                    .filter(tblName -> {
-                        final TableView newTbl = newCfg.get(tblName);
-                        final TableView oldTbl = oldCfg.get(tblName);
-
-                        assert newTbl.columns().namedListKeys() != null && oldTbl.columns().namedListKeys() != null;
-
-                        if (!newTbl.columns().namedListKeys().equals(oldTbl.columns().namedListKeys()))
-                            return true;
-
-                        return newTbl.columns().namedListKeys().stream().anyMatch(k -> {
-                            final ColumnView newCol = newTbl.columns().get(k);
-                            final ColumnView oldCol = oldTbl.columns().get(k);
-
-                            assert oldCol != null;
-
-                            if (!Objects.equals(newCol.type(), oldCol.type()))
-                                throw new SchemaModificationException("Columns type change is not supported.");
-
-                            if (!Objects.equals(newCol.nullable(), oldCol.nullable()))
-                                throw new SchemaModificationException("Column nullability change is not supported");
-
-                            if (!Objects.equals(newCol.name(), oldCol.name()) &&
-                                oldTbl.indices().namedListKeys().stream()
-                                    .map(n -> oldTbl.indices().get(n))
-                                    .filter(idx -> PrimaryIndex.PRIMARY_KEY_INDEX_NAME.equals(idx.name()))
-                                    .anyMatch(idx -> idx.columns().namedListKeys().stream()
-                                        .anyMatch(c -> idx.columns().get(c).name().equals(oldCol.name()))
-                                    ))
-                                throw new SchemaModificationException("Key column rename is not supported");
-
-                            return !Objects.equals(newCol.name(), oldCol.name()) ||
-                                !Objects.equals(newCol.defaultValue(), oldCol.defaultValue());
-                        });
-                    }).collect(Collectors.toSet()) :
-                Collections.emptySet();
-
-        if (!tablesToStart.isEmpty())
-            futs.addAll(startTables(tablesToStart, rev, newCfg));
-
-        if (!schemaChanged.isEmpty())
-            futs.addAll(changeSchema(schemaChanged, oldCfg, newCfg));
-
-        if (!tablesToStop.isEmpty())
-            futs.addAll(stopTables(tablesToStop));
-
-        return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
-    }
-
-    /**
-     * Start tables routine.
-     *
-     * @param tbls Tables to start.
-     * @param rev Metastore revision.
-     * @param cfgs Table configurations.
-     * @return Table creation futures.
-     */
-    private List<CompletableFuture<Boolean>> startTables(Set<String> tbls, long rev, NamedListView<TableView> cfgs) {
-        boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(nodeCfgMgr);
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        for (String tblName : tbls) {
-            TableView tableView = cfgs.get(tblName);
-
-            UUID tblId = new UUID(rev, 0L);
-
-            if (hasMetastorageLocally) {
-                var key = new ByteArray(INTERNAL_PREFIX + tblId);
-                futs.add(metaStorageMgr.invoke(
-                    Conditions.notExists(key),
-                    Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                    Operations.noop())
-                    .thenCompose(res -> schemaMgr.initSchemaForTable(tblId, tableView.name()))
-                    .thenCompose(res -> affMgr.calculateAssignments(tblId, tableView.name())));
-            }
-
-            final CompletableFuture<AffinityEventParameters> affinityReadyFut = new CompletableFuture<>();
-            final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();
-
-            CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
-                .exceptionally(e -> {
-                    LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);
-
-                    onEvent(TableEvent.CREATE, new TableEventParameters(tblId, tblName), e);
-
-                    return null;
-                })
-                .thenRun(() -> createTableLocally(
-                    tblName,
-                    tblId,
-                    affinityReadyFut.join().assignment(),
-                    schemaReadyFut.join().schemaRegistry()
-                ));
+    private void dropTableLocally(String name, IgniteUuid tblId, List<List<ClusterNode>> assignment) {
+        try {
+            int partitions = assignment.size();
 
-            affMgr.listen(AffinityEvent.CALCULATED, new EventListener<>() {
-                @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()))
-                        return false;
+            for (int p = 0; p < partitions; p++)
+                raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
 
-                    if (e == null)
-                        affinityReadyFut.complete(parameters);
-                    else
-                        affinityReadyFut.completeExceptionally(e);
+            TableImpl table = tables.get(name);
 
-                    return true;
-                }
+            assert table != null : "There is no table with the name specified [name=" + name + ']';
 
-                @Override public void remove(@NotNull Throwable e) {
-                    affinityReadyFut.completeExceptionally(e);
-                }
-            });
+            tables.remove(name);
+            tablesById.remove(table.tableId());
 
-            schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<>() {
-                @Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()) && parameters.schemaRegistry().lastSchemaVersion() >= 1)
-                        return false;
+            fireEvent(TableEvent.DROP, new TableEventParameters(table), null);
 
-                    if (e == null)
-                        schemaReadyFut.complete(parameters);
-                    else
-                        schemaReadyFut.completeExceptionally(e);
-
-                    return true;
-                }
-
-                @Override public void remove(@NotNull Throwable e) {
-                    schemaReadyFut.completeExceptionally(e);
-                }
-            });
+            Optional.ofNullable(dropTblIntention.get(tblId)).ifPresent(f -> f.complete(null));
         }
-
-        return futs;
-    }
-
-    /**
-     * Drop tables routine.
-     *
-     * @param tbls Tables to drop.
-     * @return Table drop futures.
-     */
-    private List<CompletableFuture<Boolean>> stopTables(Set<String> tbls) {
-        boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(nodeCfgMgr);
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        for (String tblName : tbls) {
-            TableImpl t = tables.get(tblName);
-
-            UUID tblId = t.tableId();
-
-            if (hasMetastorageLocally) {
-                var key = new ByteArray(INTERNAL_PREFIX + tblId);
-
-                futs.add(affMgr.removeAssignment(tblId)
-                    .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
-                    .thenCompose(res ->
-                        metaStorageMgr.invoke(Conditions.exists(key),
-                            Operations.remove(key),
-                            Operations.noop())));
-            }
-
-            affMgr.listen(AffinityEvent.REMOVED, new EventListener<>() {
-                @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()))
-                        return false;
-
-                    if (e == null)
-                        dropTableLocally(tblName, tblId, parameters.assignment());
-                    else
-                        onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
-
-                    return true;
-                }
-
-                @Override public void remove(@NotNull Throwable e) {
-                    onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
-                }
-            });
+        catch (Exception e) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710908593



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.Optional;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+import org.apache.ignite.lang.LoggerMessageHelper;
+import org.apache.ignite.schema.SchemaTable;
+
+/**
+ * Stateless schema utils that produces helper methods for schema preparation.
+ * <p>
+ * Schemas itself MUST be registered in a version ascending order incrementing by {@code 1} with NO gaps,
+ * otherwise an exception will be thrown. The version numbering starts from the {@code 1}.
+ * <p>
+ * After some table maintenance process some first versions may become outdated and can be safely cleaned up
+ * if the process guarantees the table no longer has a data of these versions.
+ *
+ * @implSpec The changes in between two arbitrary actual versions MUST NOT be lost.
+ * Thus, schema versions can only be removed from the beginning.
+ * @implSpec Initial schema history MAY be registered without the first outdated versions
+ * that could be cleaned up earlier.

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707504488



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       I believe that within column to column in index context we don't need columnIndex (if you mean that column index here is position in columns list).
   
   From the user's perspective he creates index using column names: 
   ```
           SchemaTable scmTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
               SchemaBuilders.column("key", ColumnType.UUID).asNonNull().build(),
               SchemaBuilders.column("affKey", ColumnType.INT64).asNonNull().build(),
               SchemaBuilders.column("valStr", ColumnType.string()).asNullable().build(),
               SchemaBuilders.column("valInt", ColumnType.INT32).asNullable().build(),
               SchemaBuilders.column("valNull", ColumnType.INT16).asNullable().build()
           ).withIndex(
               SchemaBuilders.pkIndex()
                   .addIndexColumn("key").done()
                   .addIndexColumn("affKey").done()
                   .withAffinityColumns("affKey")
                   .build()
           ).build();
   ```
   However during configuration change we generate id for every column and substitute name with such id within "index schema":
   table
   ..name:PUBLIC.tbl1
   ..id: 41f7a3ca
   ..columns
   ....listItem0
   ......namedListKey: 0 (or "key" or 870024f6) it doesn't matter here 
   ......namedListVal:
   ........name: key
   ........**id: 870024f6**
   ........type: UUID
   ........nullable: false
      ...
   ..indexes
        ....
   ......name: PK_INDEX
   ......**columnIds: [870024f6]** instead of colNames
   
   Does it make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707458546



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed event
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
-            return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
+        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != null :
+                    "Table =[" + ctx.newValue().name() + "] has empty assignments.";
+
+                final IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());

Review comment:
       ctx.newValue() - marked as Nullable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709420636



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -627,37 +516,57 @@ private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> a
     @Override public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
         CompletableFuture<Void> tblFut = new CompletableFuture<>();
 
-        listen(TableEvent.ALTER, new EventListener<>() {
-            @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
-                String tableName = parameters.tableName();
-
-                if (!name.equals(tableName))
-                    return false;
-
-                if (e == null)
-                    tblFut.complete(null);
-                else
-                    tblFut.completeExceptionally(e);
-
-                return true;
+        tableAsync(name, true).thenAccept(tbl -> {
+            if (tbl == null) {
+                tblFut.completeExceptionally(new IgniteException(
+                    LoggerMessageHelper.format("Table already exists [name={}]", name)));

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708613836



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -627,37 +516,57 @@ private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> a
     @Override public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
         CompletableFuture<Void> tblFut = new CompletableFuture<>();
 
-        listen(TableEvent.ALTER, new EventListener<>() {
-            @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
-                String tableName = parameters.tableName();
-
-                if (!name.equals(tableName))
-                    return false;
-
-                if (e == null)
-                    tblFut.complete(null);
-                else
-                    tblFut.completeExceptionally(e);
-
-                return true;
+        tableAsync(name, true).thenAccept(tbl -> {
+            if (tbl == null) {
+                tblFut.completeExceptionally(new IgniteException(
+                    LoggerMessageHelper.format("Table already exists [name={}]", name)));

Review comment:
       It is a wrong message.
   Table does not exist and cannot be altered.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707478098



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed event
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
-            return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
+        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != null :
+                    "Table =[" + ctx.newValue().name() + "] has empty assignments.";
+
+                final IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+                // TODO: IGNITE-15409 Listener with any placeholder should be used instead.
+                ((ExtendedTableConfiguration) clusterCfgMgr.configurationRegistry().
+                    getConfiguration(TablesConfiguration.KEY).tables().get(ctx.newValue().name())).schemas().
+                    listenElements(new ConfigurationNamedListListener<>() {
+                        @Override public @NotNull CompletableFuture<?> onCreate(
+                            @NotNull ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+                            try {
+                                ((SchemaRegistryImpl)tables.get(ctx.newValue().name()).schemaRegistry()).
+                                    onSchemaRegistered((SchemaDescriptor)ByteUtils.
+                                        fromBytes(schemasCtx.newValue().schema()));
+
+                                fireEvent(TableEvent.ALTER, new TableEventParameters(tablesById.get(tblId)), null);
+
+                                Optional.ofNullable(alterTblIntention.get(tblId)).ifPresent(f -> f.complete(null));
+                            }
+                            catch (Exception e) {
+                                Optional.ofNullable(alterTblIntention.get(tblId)).ifPresent(f -> f.completeExceptionally(e));
+                            }
+
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override
+                        public @NotNull CompletableFuture<?> onRename(@NotNull String oldName, @NotNull String newName,
+                            @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override public @NotNull CompletableFuture<?> onDelete(
+                            @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override public @NotNull CompletableFuture<?> onUpdate(
+                            @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
+
+                createTableLocally(
+                    ctx.newValue().name(),
+                    IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
+                    (List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
+                    (SchemaDescriptor)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).schemas().
+                        get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
+                );
+
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override public @NotNull CompletableFuture<?> onRename(@NotNull String oldName, @NotNull String newName,
+                @NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // TODO: IGNITE-15485 Support table rename operation.
+
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override

Review comment:
        @Override on a new line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708429383



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -240,301 +360,38 @@ private void createTableLocally(
      * @param tblId Table id.
      * @param assignment Affinity assignment.
      */
-    private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
-        int partitions = assignment.size();
-
-        for (int p = 0; p < partitions; p++)
-            raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
-
-        TableImpl table = tables.get(name);
-
-        assert table != null : "There is no table with the name specified [name=" + name + ']';
-
-        onEvent(TableEvent.DROP, new TableEventParameters(table), null);
-    }
-
-    /**
-     * Compounds a RAFT group unique name.
-     *
-     * @param tableId Table identifier.
-     * @param partition Number of table partitions.
-     * @return A RAFT group name.
-     */
-    @NotNull private String raftGroupName(UUID tableId, int partition) {
-        return tableId + "_part_" + partition;
-    }
-
-    /**
-     * Table configuration changed callback.
-     *
-     * @param rev Storage revision.
-     * @param oldCfg Old configuration.
-     * @param newCfg New configuration.
-     * @return Operation future.
-     */
-    @NotNull private CompletableFuture<?> onConfigurationChanged(
-        long rev,
-        @Nullable NamedListView<TableView> oldCfg,
-        @Nullable NamedListView<TableView> newCfg
-    ) {
-        Set<String> tablesToStart = (newCfg == null || newCfg.namedListKeys() == null) ?
-            Collections.emptySet() :
-            newCfg.namedListKeys().stream().filter(t -> !oldCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
-
-        Set<String> tablesToStop = (oldCfg == null || oldCfg.namedListKeys() == null) ?
-            Collections.emptySet() :
-            oldCfg.namedListKeys().stream().filter(t -> !newCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        final Set<String> schemaChanged =
-            (oldCfg != null && oldCfg.namedListKeys() != null && newCfg != null && newCfg.namedListKeys() != null) ?
-                oldCfg.namedListKeys().stream()
-                    .filter(tblName -> newCfg.namedListKeys().contains(tblName)) // Filter changed tables.
-                    .filter(tblName -> {
-                        final TableView newTbl = newCfg.get(tblName);
-                        final TableView oldTbl = oldCfg.get(tblName);
-
-                        assert newTbl.columns().namedListKeys() != null && oldTbl.columns().namedListKeys() != null;
-
-                        if (!newTbl.columns().namedListKeys().equals(oldTbl.columns().namedListKeys()))
-                            return true;
-
-                        return newTbl.columns().namedListKeys().stream().anyMatch(k -> {
-                            final ColumnView newCol = newTbl.columns().get(k);
-                            final ColumnView oldCol = oldTbl.columns().get(k);
-
-                            assert oldCol != null;
-
-                            if (!Objects.equals(newCol.type(), oldCol.type()))
-                                throw new SchemaModificationException("Columns type change is not supported.");
-
-                            if (!Objects.equals(newCol.nullable(), oldCol.nullable()))
-                                throw new SchemaModificationException("Column nullability change is not supported");
-
-                            if (!Objects.equals(newCol.name(), oldCol.name()) &&
-                                oldTbl.indices().namedListKeys().stream()
-                                    .map(n -> oldTbl.indices().get(n))
-                                    .filter(idx -> PrimaryIndex.PRIMARY_KEY_INDEX_NAME.equals(idx.name()))
-                                    .anyMatch(idx -> idx.columns().namedListKeys().stream()
-                                        .anyMatch(c -> idx.columns().get(c).name().equals(oldCol.name()))
-                                    ))
-                                throw new SchemaModificationException("Key column rename is not supported");
-
-                            return !Objects.equals(newCol.name(), oldCol.name()) ||
-                                !Objects.equals(newCol.defaultValue(), oldCol.defaultValue());
-                        });
-                    }).collect(Collectors.toSet()) :
-                Collections.emptySet();
-
-        if (!tablesToStart.isEmpty())
-            futs.addAll(startTables(tablesToStart, rev, newCfg));
-
-        if (!schemaChanged.isEmpty())
-            futs.addAll(changeSchema(schemaChanged, oldCfg, newCfg));
-
-        if (!tablesToStop.isEmpty())
-            futs.addAll(stopTables(tablesToStop));
-
-        return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
-    }
-
-    /**
-     * Start tables routine.
-     *
-     * @param tbls Tables to start.
-     * @param rev Metastore revision.
-     * @param cfgs Table configurations.
-     * @return Table creation futures.
-     */
-    private List<CompletableFuture<Boolean>> startTables(Set<String> tbls, long rev, NamedListView<TableView> cfgs) {
-        boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(nodeCfgMgr);
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        for (String tblName : tbls) {
-            TableView tableView = cfgs.get(tblName);
-
-            UUID tblId = new UUID(rev, 0L);
-
-            if (hasMetastorageLocally) {
-                var key = new ByteArray(INTERNAL_PREFIX + tblId);
-                futs.add(metaStorageMgr.invoke(
-                    Conditions.notExists(key),
-                    Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                    Operations.noop())
-                    .thenCompose(res -> schemaMgr.initSchemaForTable(tblId, tableView.name()))
-                    .thenCompose(res -> affMgr.calculateAssignments(tblId, tableView.name())));
-            }
-
-            final CompletableFuture<AffinityEventParameters> affinityReadyFut = new CompletableFuture<>();
-            final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();
-
-            CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
-                .exceptionally(e -> {
-                    LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);
-
-                    onEvent(TableEvent.CREATE, new TableEventParameters(tblId, tblName), e);
-
-                    return null;
-                })
-                .thenRun(() -> createTableLocally(
-                    tblName,
-                    tblId,
-                    affinityReadyFut.join().assignment(),
-                    schemaReadyFut.join().schemaRegistry()
-                ));
+    private void dropTableLocally(String name, IgniteUuid tblId, List<List<ClusterNode>> assignment) {
+        try {
+            int partitions = assignment.size();
 
-            affMgr.listen(AffinityEvent.CALCULATED, new EventListener<>() {
-                @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()))
-                        return false;
+            for (int p = 0; p < partitions; p++)
+                raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
 
-                    if (e == null)
-                        affinityReadyFut.complete(parameters);
-                    else
-                        affinityReadyFut.completeExceptionally(e);
+            TableImpl table = tables.get(name);
 
-                    return true;
-                }
+            assert table != null : "There is no table with the name specified [name=" + name + ']';
 
-                @Override public void remove(@NotNull Throwable e) {
-                    affinityReadyFut.completeExceptionally(e);
-                }
-            });
+            tables.remove(name);
+            tablesById.remove(table.tableId());
 
-            schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<>() {
-                @Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()) && parameters.schemaRegistry().lastSchemaVersion() >= 1)
-                        return false;
+            fireEvent(TableEvent.DROP, new TableEventParameters(table), null);
 
-                    if (e == null)
-                        schemaReadyFut.complete(parameters);
-                    else
-                        schemaReadyFut.completeExceptionally(e);
-
-                    return true;
-                }
-
-                @Override public void remove(@NotNull Throwable e) {
-                    schemaReadyFut.completeExceptionally(e);
-                }
-            });
+            Optional.ofNullable(dropTblIntention.get(tblId)).ifPresent(f -> f.complete(null));
         }
-
-        return futs;
-    }
-
-    /**
-     * Drop tables routine.
-     *
-     * @param tbls Tables to drop.
-     * @return Table drop futures.
-     */
-    private List<CompletableFuture<Boolean>> stopTables(Set<String> tbls) {
-        boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(nodeCfgMgr);
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        for (String tblName : tbls) {
-            TableImpl t = tables.get(tblName);
-
-            UUID tblId = t.tableId();
-
-            if (hasMetastorageLocally) {
-                var key = new ByteArray(INTERNAL_PREFIX + tblId);
-
-                futs.add(affMgr.removeAssignment(tblId)
-                    .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
-                    .thenCompose(res ->
-                        metaStorageMgr.invoke(Conditions.exists(key),
-                            Operations.remove(key),
-                            Operations.noop())));
-            }
-
-            affMgr.listen(AffinityEvent.REMOVED, new EventListener<>() {
-                @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()))
-                        return false;
-
-                    if (e == null)
-                        dropTableLocally(tblName, tblId, parameters.assignment());
-                    else
-                        onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
-
-                    return true;
-                }
-
-                @Override public void remove(@NotNull Throwable e) {
-                    onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
-                }
-            });
+        catch (Exception e) {

Review comment:
       Drop table event with the exception have to fire here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708371761



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/configuration/schema/ExtendedTableConfigurationSchema.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.schema;
+
+import org.apache.ignite.configuration.annotation.InternalConfiguration;
+import org.apache.ignite.configuration.annotation.NamedConfigValue;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.schemas.table.TableConfigurationSchema;
+import org.apache.ignite.configuration.validation.Immutable;
+
+/**
+ * Extended table configuration schema class.
+ */
+@InternalConfiguration
+public class ExtendedTableConfigurationSchema extends TableConfigurationSchema {
+    /** Table id. String representation of {@link org.apache.ignite.lang.IgniteUuid}. */
+    @Value
+    @Immutable
+    public String id;

Review comment:
       Neither IgniteUuid nor UUID are supported in configuration.
   For the same reason byte array is used for affinity assignments - neither collections nor array of arrays are supported. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709390053



##########
File path: modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityService.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Stateless affinity service that produces helper methods for an affinity assignments calculation.
+ */
+public class AffinityService {
+    /**
+     * Calculates affinity assignments.
+     *
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
+     * @return List nodes by partition.
+     */
+    public static List<List<ClusterNode>> calculateAssignments(

Review comment:
       Renamed to AffinityUtils, same for SchemaService->SchemaUtils.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707450444



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();

Review comment:
       Seems, we can have only one intention for every single table at any time: either to add, drop or alter, but not any combination. Or do we?
   Maybe we can extend Completable future and enrich it with additional information? and use a single collection?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708096914



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       Assume there is a row R with schema V1.
   Then you update schema to V2 and add a column C with a default D1.
   Then you update schema to V3 and update default to D2.
   
   Now, you read R, which is stored as of V1 version, and upgrade it to V3. 
   On one side, R doesn't contains C column, and there is no descriptor for column C in V1.
   On another side, you can't use column C descriptor from V3 because it was changed in between.
   Using column C descriptor from V2 will be correct, but you will need to go through schema history for that for every row and every column you read just to check if there is any history.
   Column mapper merges schema changes once and then get reused.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708434814



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();

Review comment:
       Possibly the logic with intentions might be reduced.
   Because now all places when the intention is applying, an event is also firing (create table intention -> create table event and so on).
   It looks like a duplicate behavior, I think possible to use only one of them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708096914



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       Assume there is a row R with schema V1.
   Then you update schema to V2 and add a column C with a default D1.
   Then you update schema to V3 and update default to D2.
   
   Now, you read R, which is stored as of V1 version, and upgrade it to V3. 
   On one side, R doesn't contains C column, and there is no descriptor for column C in V1.
   On another side, you can't use column C descriptor from V3 because it was changed in between.
   Using column C descriptor from V2 will be correct, but you will need to go through schema history for that for every row and every column you read just to check if there is any history.
   
   Column mapper merges schema changes once and then get reused.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708102618



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       You suggestion about columnID look great, it makes add/drop simple, resolves ABA problem and gives rename for free.
   But we will need mapping columnID -> columnIdx in schemaDescriptor internals anyway.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710373300



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -240,301 +360,38 @@ private void createTableLocally(
      * @param tblId Table id.
      * @param assignment Affinity assignment.
      */
-    private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
-        int partitions = assignment.size();
-
-        for (int p = 0; p < partitions; p++)
-            raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
-
-        TableImpl table = tables.get(name);
-
-        assert table != null : "There is no table with the name specified [name=" + name + ']';
-
-        onEvent(TableEvent.DROP, new TableEventParameters(table), null);
-    }
-
-    /**
-     * Compounds a RAFT group unique name.
-     *
-     * @param tableId Table identifier.
-     * @param partition Number of table partitions.
-     * @return A RAFT group name.
-     */
-    @NotNull private String raftGroupName(UUID tableId, int partition) {
-        return tableId + "_part_" + partition;
-    }
-
-    /**
-     * Table configuration changed callback.
-     *
-     * @param rev Storage revision.
-     * @param oldCfg Old configuration.
-     * @param newCfg New configuration.
-     * @return Operation future.
-     */
-    @NotNull private CompletableFuture<?> onConfigurationChanged(
-        long rev,
-        @Nullable NamedListView<TableView> oldCfg,
-        @Nullable NamedListView<TableView> newCfg
-    ) {
-        Set<String> tablesToStart = (newCfg == null || newCfg.namedListKeys() == null) ?
-            Collections.emptySet() :
-            newCfg.namedListKeys().stream().filter(t -> !oldCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
-
-        Set<String> tablesToStop = (oldCfg == null || oldCfg.namedListKeys() == null) ?
-            Collections.emptySet() :
-            oldCfg.namedListKeys().stream().filter(t -> !newCfg.namedListKeys().contains(t)).collect(Collectors.toSet());
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        final Set<String> schemaChanged =
-            (oldCfg != null && oldCfg.namedListKeys() != null && newCfg != null && newCfg.namedListKeys() != null) ?
-                oldCfg.namedListKeys().stream()
-                    .filter(tblName -> newCfg.namedListKeys().contains(tblName)) // Filter changed tables.
-                    .filter(tblName -> {
-                        final TableView newTbl = newCfg.get(tblName);
-                        final TableView oldTbl = oldCfg.get(tblName);
-
-                        assert newTbl.columns().namedListKeys() != null && oldTbl.columns().namedListKeys() != null;
-
-                        if (!newTbl.columns().namedListKeys().equals(oldTbl.columns().namedListKeys()))
-                            return true;
-
-                        return newTbl.columns().namedListKeys().stream().anyMatch(k -> {
-                            final ColumnView newCol = newTbl.columns().get(k);
-                            final ColumnView oldCol = oldTbl.columns().get(k);
-
-                            assert oldCol != null;
-
-                            if (!Objects.equals(newCol.type(), oldCol.type()))
-                                throw new SchemaModificationException("Columns type change is not supported.");
-
-                            if (!Objects.equals(newCol.nullable(), oldCol.nullable()))
-                                throw new SchemaModificationException("Column nullability change is not supported");
-
-                            if (!Objects.equals(newCol.name(), oldCol.name()) &&
-                                oldTbl.indices().namedListKeys().stream()
-                                    .map(n -> oldTbl.indices().get(n))
-                                    .filter(idx -> PrimaryIndex.PRIMARY_KEY_INDEX_NAME.equals(idx.name()))
-                                    .anyMatch(idx -> idx.columns().namedListKeys().stream()
-                                        .anyMatch(c -> idx.columns().get(c).name().equals(oldCol.name()))
-                                    ))
-                                throw new SchemaModificationException("Key column rename is not supported");
-
-                            return !Objects.equals(newCol.name(), oldCol.name()) ||
-                                !Objects.equals(newCol.defaultValue(), oldCol.defaultValue());
-                        });
-                    }).collect(Collectors.toSet()) :
-                Collections.emptySet();
-
-        if (!tablesToStart.isEmpty())
-            futs.addAll(startTables(tablesToStart, rev, newCfg));
-
-        if (!schemaChanged.isEmpty())
-            futs.addAll(changeSchema(schemaChanged, oldCfg, newCfg));
-
-        if (!tablesToStop.isEmpty())
-            futs.addAll(stopTables(tablesToStop));
-
-        return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
-    }
-
-    /**
-     * Start tables routine.
-     *
-     * @param tbls Tables to start.
-     * @param rev Metastore revision.
-     * @param cfgs Table configurations.
-     * @return Table creation futures.
-     */
-    private List<CompletableFuture<Boolean>> startTables(Set<String> tbls, long rev, NamedListView<TableView> cfgs) {
-        boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(nodeCfgMgr);
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        for (String tblName : tbls) {
-            TableView tableView = cfgs.get(tblName);
-
-            UUID tblId = new UUID(rev, 0L);
-
-            if (hasMetastorageLocally) {
-                var key = new ByteArray(INTERNAL_PREFIX + tblId);
-                futs.add(metaStorageMgr.invoke(
-                    Conditions.notExists(key),
-                    Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                    Operations.noop())
-                    .thenCompose(res -> schemaMgr.initSchemaForTable(tblId, tableView.name()))
-                    .thenCompose(res -> affMgr.calculateAssignments(tblId, tableView.name())));
-            }
-
-            final CompletableFuture<AffinityEventParameters> affinityReadyFut = new CompletableFuture<>();
-            final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();
-
-            CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
-                .exceptionally(e -> {
-                    LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);
-
-                    onEvent(TableEvent.CREATE, new TableEventParameters(tblId, tblName), e);
-
-                    return null;
-                })
-                .thenRun(() -> createTableLocally(
-                    tblName,
-                    tblId,
-                    affinityReadyFut.join().assignment(),
-                    schemaReadyFut.join().schemaRegistry()
-                ));
+    private void dropTableLocally(String name, IgniteUuid tblId, List<List<ClusterNode>> assignment) {
+        try {
+            int partitions = assignment.size();
 
-            affMgr.listen(AffinityEvent.CALCULATED, new EventListener<>() {
-                @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()))
-                        return false;
+            for (int p = 0; p < partitions; p++)
+                raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
 
-                    if (e == null)
-                        affinityReadyFut.complete(parameters);
-                    else
-                        affinityReadyFut.completeExceptionally(e);
+            TableImpl table = tables.get(name);
 
-                    return true;
-                }
+            assert table != null : "There is no table with the name specified [name=" + name + ']';
 
-                @Override public void remove(@NotNull Throwable e) {
-                    affinityReadyFut.completeExceptionally(e);
-                }
-            });
+            tables.remove(name);
+            tablesById.remove(table.tableId());
 
-            schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<>() {
-                @Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()) && parameters.schemaRegistry().lastSchemaVersion() >= 1)
-                        return false;
+            fireEvent(TableEvent.DROP, new TableEventParameters(table), null);
 
-                    if (e == null)
-                        schemaReadyFut.complete(parameters);
-                    else
-                        schemaReadyFut.completeExceptionally(e);
-
-                    return true;
-                }
-
-                @Override public void remove(@NotNull Throwable e) {
-                    schemaReadyFut.completeExceptionally(e);
-                }
-            });
+            Optional.ofNullable(dropTblIntention.get(tblId)).ifPresent(f -> f.complete(null));
         }
-
-        return futs;
-    }
-
-    /**
-     * Drop tables routine.
-     *
-     * @param tbls Tables to drop.
-     * @return Table drop futures.
-     */
-    private List<CompletableFuture<Boolean>> stopTables(Set<String> tbls) {
-        boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(nodeCfgMgr);
-
-        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
-
-        for (String tblName : tbls) {
-            TableImpl t = tables.get(tblName);
-
-            UUID tblId = t.tableId();
-
-            if (hasMetastorageLocally) {
-                var key = new ByteArray(INTERNAL_PREFIX + tblId);
-
-                futs.add(affMgr.removeAssignment(tblId)
-                    .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
-                    .thenCompose(res ->
-                        metaStorageMgr.invoke(Conditions.exists(key),
-                            Operations.remove(key),
-                            Operations.noop())));
-            }
-
-            affMgr.listen(AffinityEvent.REMOVED, new EventListener<>() {
-                @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
-                    if (!tblId.equals(parameters.tableId()))
-                        return false;
-
-                    if (e == null)
-                        dropTableLocally(tblName, tblId, parameters.assignment());
-                    else
-                        onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
-
-                    return true;
-                }
-
-                @Override public void remove(@NotNull Throwable e) {
-                    onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
-                }
-            });
+        catch (Exception e) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710761263



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       As was discussed mapping logic was restored:
   ```
                   clusterCfgMgr.configurationRegistry()
                       .getConfiguration(TablesConfiguration.KEY).tables()
                       .change(ch -> {
                           ch.createOrUpdate(name, tableChange);
                           ch.createOrUpdate(name, tblCh ->
                               ((ExtendedTableChange)tblCh).changeSchemas(
                                   schemasCh ->
                                       schemasCh.createOrUpdate(
                                           String.valueOf(schemasCh.size() + 1),
                                           schemaCh -> {
                                               ExtendedTableView currTableView = (ExtendedTableView) clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(name).value();
   
                                               SchemaDescriptor descriptor = SchemaUtils.prepareSchemaDescriptor(
                                                   ((ExtendedTableView)tblCh).schemas().size(),
                                                   tblCh
                                               );
   
                                               descriptor.columnMapping(SchemaUtils.columnMapper(
                                                   tablesById.get(tblId).schemaRegistry().schema(currTableView.schemas().size() -1),
                                                   currTableView,
                                                   descriptor,
                                                   tblCh
                                               ));
   
                                               schemaCh.changeSchema(ByteUtils.toBytes(descriptor));
                                           }
                                       )
                               ));
                       })
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov merged pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov merged pull request #330:
URL: https://github.com/apache/ignite-3/pull/330


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707504488



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       I believe that within column to column in index context we don't need columnIndex (if you mean that column index here is position in columns list).
   
   From the user's perspective he creates index using column names: 
   ```
           SchemaTable scmTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
               SchemaBuilders.column("key", ColumnType.UUID).asNonNull().build(),
               SchemaBuilders.column("affKey", ColumnType.INT64).asNonNull().build(),
               SchemaBuilders.column("valStr", ColumnType.string()).asNullable().build(),
               SchemaBuilders.column("valInt", ColumnType.INT32).asNullable().build(),
               SchemaBuilders.column("valNull", ColumnType.INT16).asNullable().build()
           ).withIndex(
               SchemaBuilders.pkIndex()
                   .addIndexColumn("key").done()
                   .addIndexColumn("affKey").done()
                   .withAffinityColumns("affKey")
                   .build()
           ).build();
   ```
   However during configuration change we generate id for every column and substitute name with such id within "index schema":
   table
     name:PUBLIC.tbl1
     id: 41f7a3ca
     columns
       listItem0
         namedListKey: 0 (or "key" or 870024f6) it doesn't matter here 
         namedListVal:
           name: key
           **id: 870024f6**
           type: UUID
           nullable: false
      ...
      indexes
        ....
          name: PK_INDEX
          **columnIds: [870024f6]** instead of colNames
   
   Does it make sence?

##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       I believe that within column to column in index context we don't need columnIndex (if you mean that column index here is position in columns list).
   
   From the user's perspective he creates index using column names: 
   ```
           SchemaTable scmTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
               SchemaBuilders.column("key", ColumnType.UUID).asNonNull().build(),
               SchemaBuilders.column("affKey", ColumnType.INT64).asNonNull().build(),
               SchemaBuilders.column("valStr", ColumnType.string()).asNullable().build(),
               SchemaBuilders.column("valInt", ColumnType.INT32).asNullable().build(),
               SchemaBuilders.column("valNull", ColumnType.INT16).asNullable().build()
           ).withIndex(
               SchemaBuilders.pkIndex()
                   .addIndexColumn("key").done()
                   .addIndexColumn("affKey").done()
                   .withAffinityColumns("affKey")
                   .build()
           ).build();
   ```
   However during configuration change we generate id for every column and substitute name with such id within "index schema":
   table
     name:PUBLIC.tbl1
     id: 41f7a3ca
     columns
       listItem0
         namedListKey: 0 (or "key" or 870024f6) it doesn't matter here 
         namedListVal:
           name: key
           **id: 870024f6**
           type: UUID
           nullable: false
      ...
      indexes
        ....
          name: PK_INDEX
          **columnIds: [870024f6]** instead of colNames
   
   Does it make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707474562



##########
File path: modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityService.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Stateless affinity service that produces helper methods for an affinity assignments calculation.
+ */
+public class AffinityService {
+    /**
+     * Calculates affinity assignments.
+     *
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
+     * @return List nodes by partition.
+     */
+    public static List<List<ClusterNode>> calculateAssignments(

Review comment:
       I've just confused with naming. Its just a util method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709419030



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed event
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
-            return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
+        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != null :
+                    "Table =[" + ctx.newValue().name() + "] has empty assignments.";
+
+                final IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+                // TODO: IGNITE-15409 Listener with any placeholder should be used instead.
+                ((ExtendedTableConfiguration) clusterCfgMgr.configurationRegistry().
+                    getConfiguration(TablesConfiguration.KEY).tables().get(ctx.newValue().name())).schemas().
+                    listenElements(new ConfigurationNamedListListener<>() {

Review comment:
       Nope, configuration subscriptions will be automatically removed on proper cfg node removal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709388511



##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -325,6 +326,33 @@ public ClientMessagePacker packUuid(UUID val) {
         return this;
     }
 
+    /**
+     * Writes an {@link IgniteUuid}.
+     *
+     * @param val {@link IgniteUuid} value.
+     * @return This instance.
+     */
+    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {

Review comment:
       Well I believe that it's possible to use different approach with IgniteUUID in compassion to all other packing/unpacking logic in ClientMessagePacker and it's unpacker counterpart, however it'll be confusing.
   Current IgniteUUID packing approach is similar to UUID approach in ClientMessagePacker that internally uses org.msgpack.core.MessagePacker, packer close state, etc. Besides that there is a ticket for enhancement - https://issues.apache.org/jira/browse/IGNITE-15234
   As a result I believe that using utility method for IgniteUuid is out of the scope of Ignite-15404 ticket.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710373143



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -173,63 +273,83 @@ public TableManager(
      * @param name Table name.
      * @param tblId Table id.
      * @param assignment Affinity assignment.
-     * @param schemaReg Schema registry for the table.
      */
     private void createTableLocally(
         String name,
-        UUID tblId,
+        IgniteUuid tblId,
         List<List<ClusterNode>> assignment,
-        SchemaRegistry schemaReg
+        SchemaDescriptor schemaDesc
     ) {
         int partitions = assignment.size();
 
         var partitionsGroupsFutures = new ArrayList<CompletableFuture<RaftGroupService>>();
 
-        Path storageDir = partitionsStoreDir.resolve(name);
+        IntStream.range(0, partitions).forEach(p ->
+            partitionsGroupsFutures.add(
+                raftMgr.prepareRaftGroup(
+                    raftGroupName(tblId, p),
+                    assignment.get(p),
+                    () -> {
+                        Path storageDir = partitionsStoreDir.resolve(name);
+
+                        try {
+                            Files.createDirectories(storageDir);
+                        }
+                        catch (IOException e) {
+                            throw new IgniteInternalException(
+                                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+                                e
+                            );
+                        }
+
+                        return new PartitionListener(
+                            new RocksDbStorage(
+                                storageDir.resolve(String.valueOf(p)),
+                                ByteBuffer::compareTo
+                            )
+                        );
+                    }
+                )
+            )
+        );
 
-        try {
-            Files.createDirectories(storageDir);
-        } catch (IOException e) {
-            throw new IgniteInternalException(
-                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
-                e
-            );
-        }
+        CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
+            try {
+                HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
 
-        for (int p = 0; p < partitions; p++) {
-            RocksDbStorage storage = new RocksDbStorage(
-                storageDir.resolve(String.valueOf(p)),
-                ByteBuffer::compareTo
-            );
-
-            partitionsGroupsFutures.add(raftMgr.prepareRaftGroup(
-                raftGroupName(tblId, p),
-                assignment.get(p),
-                new PartitionListener(storage)
-            ));
-        }
+                for (int p = 0; p < partitions; p++) {
+                    CompletableFuture<RaftGroupService> future = partitionsGroupsFutures.get(p);
 
-        CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(() -> {
-            HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+                    assert future.isDone();
 
-            for (int p = 0; p < partitions; p++) {
-                CompletableFuture<RaftGroupService> future = partitionsGroupsFutures.get(p);
+                    RaftGroupService service = future.join();
 
-                assert future.isDone();
+                    partitionMap.put(p, service);
+                }
 
-                RaftGroupService service = future.join();
+                InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions);
 
-                partitionMap.put(p, service);
-            }
+                var schemaRegistry = new SchemaRegistryImpl(v -> schemaDesc);
 
-            InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions);
+                schemaRegistry.onSchemaRegistered(schemaDesc);
+
+                var table = new TableImpl(
+                    internalTable,
+                    schemaRegistry,
+                    TableManager.this,
+                    null
+                );
 
-            var table = new TableImpl(internalTable, schemaReg, this, null);
+                tables.put(name, table);
+                tablesById.put(tblId, table);
 
-            tables.put(name, table);
-            tablesById.put(table.tableId(), table);
+                fireEvent(TableEvent.CREATE, new TableEventParameters(table), null);
 
-            onEvent(TableEvent.CREATE, new TableEventParameters(table), null);
+                Optional.ofNullable(createTblIntention.get(tblId)).ifPresent(f -> f.complete(table));
+            }
+            catch (Exception e) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709410163



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -566,52 +423,84 @@ private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> a
      * {@code false} means the existing table will be returned.
      * @return A table instance.
      */
-    private CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
+    private CompletableFuture<Table> createTableAsync(
+        String name,
+        Consumer<TableChange> tableInitChange,
+        boolean exceptionWhenExist
+    ) {
         CompletableFuture<Table> tblFut = new CompletableFuture<>();
 
-        EventListener<TableEventParameters> clo = new EventListener<>() {
-            @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
-                String tableName = parameters.tableName();
-
-                if (!name.equals(tableName))
-                    return false;
-
-                if (e == null)
-                    tblFut.complete(parameters.table());
-                else
-                    tblFut.completeExceptionally(e);
-
-                return true;
-            }
-
-            @Override public void remove(@NotNull Throwable e) {
-                tblFut.completeExceptionally(e);
-            }
-        };
-
-        listen(TableEvent.CREATE, clo);
-
         tableAsync(name, true).thenAccept(tbl -> {
             if (tbl != null) {
                 if (exceptionWhenExist) {
-                    removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(
-                            LoggerMessageHelper.format("Table already exists [name={}]", name)));
-                } else if (tblFut.complete(tbl))
-                    removeListener(TableEvent.CREATE, clo);
-            } else {
-                try {
-                    clusterCfgMgr
-                        .configurationRegistry()
-                        .getConfiguration(TablesConfiguration.KEY)
-                        .tables()
-                        .change(change -> change.create(name, tableInitChange))
-                        .get();
+                    tblFut.completeExceptionally(new IgniteInternalCheckedException(
+                        LoggerMessageHelper.format("Table already exists [name={}]", name)));
                 }
-                catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Table wasn't created [name=" + name + ']', e);
+                else
+                    tblFut.complete(tbl);
+            }
+            else {
+                IgniteUuid tblId = TABLE_ID_GENERATOR.randomUuid();
 
-                    removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(e));
-                }
+                createTblIntention.put(tblId, new CompletableFuture<>());
+
+                clusterCfgMgr
+                    .configurationRegistry()
+                    .getConfiguration(TablesConfiguration.KEY)
+                    .tables()
+                    .change(
+                        change -> change.create(
+                            name,
+                            (ch) -> {
+                                tableInitChange.accept(ch);
+                                ((ExtendedTableChange)ch).
+                                    // Table id specification.
+                                    changeId(tblId.toString()).
+                                    // Affinity assignments calculation.
+                                    changeAssignments(
+                                        ByteUtils.toBytes(
+                                            AffinityService.calculateAssignments(
+                                                baselineMgr.nodes(),
+                                                ch.partitions(),
+                                                ch.replicas()
+                                            )
+                                        )
+                                    ).
+                                    // Table schema preparation.
+                                    changeSchemas(
+                                        schemasCh -> schemasCh.create(
+                                            String.valueOf(INITIAL_SCHEMA_VERSION),
+                                            schemaCh -> schemaCh.changeSchema(
+                                                ByteUtils.toBytes(
+                                                    SchemaService.prepareSchemaDescriptor(
+                                                        ((ExtendedTableView)ch).schemas().size(),
+                                                        ch
+                                                    )
+                                                )
+                                            )
+                                        )
+                                    );
+                            }
+                        )
+                    )
+                    .thenRun(() -> createTblIntention.get(tblId).thenApply(tblFut::complete)
+                        .thenRun(() -> createTblIntention.remove(tblId))
+                        .exceptionally(throwable -> {
+                            createTblIntention.remove(tblId);
+
+                            tblFut.completeExceptionally(new IgniteException(throwable));
+
+                            return null;
+                        }))
+                    .exceptionally(throwable -> {
+                        LOG.error("Table wasn't created [name=" + name + ']', throwable);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709403152



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed event
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
-            return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
+        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != null :
+                    "Table =[" + ctx.newValue().name() + "] has empty assignments.";
+
+                final IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+                // TODO: IGNITE-15409 Listener with any placeholder should be used instead.
+                ((ExtendedTableConfiguration) clusterCfgMgr.configurationRegistry().
+                    getConfiguration(TablesConfiguration.KEY).tables().get(ctx.newValue().name())).schemas().
+                    listenElements(new ConfigurationNamedListListener<>() {
+                        @Override public @NotNull CompletableFuture<?> onCreate(
+                            @NotNull ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+                            try {
+                                ((SchemaRegistryImpl)tables.get(ctx.newValue().name()).schemaRegistry()).
+                                    onSchemaRegistered((SchemaDescriptor)ByteUtils.
+                                        fromBytes(schemasCtx.newValue().schema()));
+
+                                fireEvent(TableEvent.ALTER, new TableEventParameters(tablesById.get(tblId)), null);
+
+                                Optional.ofNullable(alterTblIntention.get(tblId)).ifPresent(f -> f.complete(null));
+                            }
+                            catch (Exception e) {
+                                Optional.ofNullable(alterTblIntention.get(tblId)).ifPresent(f -> f.completeExceptionally(e));
+                            }
+
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override
+                        public @NotNull CompletableFuture<?> onRename(@NotNull String oldName, @NotNull String newName,
+                            @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override public @NotNull CompletableFuture<?> onDelete(
+                            @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override public @NotNull CompletableFuture<?> onUpdate(
+                            @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
+
+                createTableLocally(
+                    ctx.newValue().name(),
+                    IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
+                    (List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
+                    (SchemaDescriptor)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).schemas().
+                        get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
+                );
+
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override public @NotNull CompletableFuture<?> onRename(@NotNull String oldName, @NotNull String newName,
+                @NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // TODO: IGNITE-15485 Support table rename operation.
+
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707418957



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       I mean that index should be mapped to columns by column_id's that will be a part of internal configuration extension. column_id's will be final, so column renaming won't effect column-to-index mapping.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708054436



##########
File path: modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
##########
@@ -82,15 +83,18 @@ public Loza(ClusterService clusterNetSvc, Path dataPath) {
      * @param lsnr Raft group listener.
      * @return Future representing pending completion of the operation.
      */
-    public CompletableFuture<RaftGroupService> prepareRaftGroup(String groupId, List<ClusterNode> nodes, RaftGroupListener lsnr) {
+    public CompletableFuture<RaftGroupService> prepareRaftGroup(
+        String groupId,
+        List<ClusterNode> nodes,
+        Supplier<RaftGroupListener> lsnrSupplier) {

Review comment:
       change javadoc plz




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707388664



##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -325,6 +326,33 @@ public ClientMessagePacker packUuid(UUID val) {
         return this;
     }
 
+    /**
+     * Writes an {@link IgniteUuid}.
+     *
+     * @param val {@link IgniteUuid} value.
+     * @return This instance.
+     */
+    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {

Review comment:
       If you mean that there should be an unpack counterpart, there is some org.apache.ignite.internal.client.proto.ClientMessageUnpacker#unpackIgniteUuid




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710372849



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();

Review comment:
       Well, seems that local events deserves their own refactoring, reverted so far, as Vlad suggested. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710793168



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.Optional;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
+import org.apache.ignite.lang.LoggerMessageHelper;
+import org.apache.ignite.schema.SchemaTable;
+
+/**
+ * Stateless schema utils that produces helper methods for schema preparation.
+ * <p>
+ * Schemas itself MUST be registered in a version ascending order incrementing by {@code 1} with NO gaps,
+ * otherwise an exception will be thrown. The version numbering starts from the {@code 1}.
+ * <p>
+ * After some table maintenance process some first versions may become outdated and can be safely cleaned up
+ * if the process guarantees the table no longer has a data of these versions.
+ *
+ * @implSpec The changes in between two arbitrary actual versions MUST NOT be lost.
+ * Thus, schema versions can only be removed from the beginning.
+ * @implSpec Initial schema history MAY be registered without the first outdated versions
+ * that could be cleaned up earlier.
+ */
+public class SchemaUtils {
+    /**
+     * Creates schema descriptor for the table with specified configuration.
+     *
+     * @param schemaVer Schema version.
+     * @param tblCfg Table configuration.
+     * @return Schema descriptor.
+     */
+    public static SchemaDescriptor prepareSchemaDescriptor(int schemaVer, TableView tblCfg) {
+        SchemaTable schemaTbl = SchemaConfigurationConverter.convert(tblCfg);
+
+        return SchemaDescriptorConverter.convert(schemaVer, schemaTbl);
+    }
+
+    /**
+     * Prepares column mapper.
+     *
+     * @param oldDesc Old schema descriptor.
+     * @param oldTbl Old table configuration.
+     * @param newDesc New schema descriptor.
+     * @param newTbl New table configuration.
+     * @return Column mapper.
+     */
+    public static ColumnMapper columnMapper(
+        SchemaDescriptor oldDesc,
+        TableView oldTbl,
+        SchemaDescriptor newDesc,
+        TableView newTbl
+    ) {
+        ColumnMapper mapper = null;
+
+        for (String s : newTbl.columns().namedListKeys()) {
+            final ColumnView newColView = newTbl.columns().get(s);
+            final ColumnView oldColView = oldTbl.columns().get(s);
+
+            if (oldColView == null) {
+                final Column newCol = newDesc.column(newColView.name());
+
+                assert !newDesc.isKeyColumn(newCol.schemaIndex());
+
+                if (mapper == null)
+                    mapper = ColumnMapping.createMapper(newDesc);
+
+                mapper.add(newCol); // New column added.
+            }
+            else {
+                final Column newCol = newDesc.column(newColView.name());
+                final Column oldCol = oldDesc.column(oldColView.name());
+
+                // TODO: IGNITE-15414 Assertion just in case, proper validation should be implemented with the help of
+                // TODO: configuration validators.
+                assert newCol.type().equals(oldCol.type()) :
+                    LoggerMessageHelper.format(
+                        "Column types doesn't match [column={}, oldType={}, newType={}",
+                        oldCol.name(),
+                        oldCol.type(),
+                        newCol.type()
+                    );
+
+                assert newCol.nullable() == oldCol.nullable() :
+                    LoggerMessageHelper.format(
+                        "Column nullable properties doesn't match [column={}, oldNullable={}, newNullable={}",
+                        oldCol.name(),
+                        oldCol.nullable(),
+                        newCol.nullable()
+                    );
+
+                if (newCol.schemaIndex() == oldCol.schemaIndex())
+                    continue;
+
+                if (mapper == null)
+                    mapper = ColumnMapping.createMapper(newDesc);
+
+                mapper.add(newCol.schemaIndex(), oldCol.schemaIndex());
+            }
+        }
+
+        final Optional<Column> droppedKeyCol = oldTbl.columns().namedListKeys().stream()
+            .filter(k -> newTbl.columns().get(k) == null)

Review comment:
       Objects::nonNull - stream way )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708106468



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       > we don't need columnIndex (if you mean that column index here is position in columns list).
   I mean column index in Columns class. E,g, see SchemaDescriptor.keyCols.
   We need a specific order for serialization: key columns prior to value columns and fixlen types prior to varlen types.
   Also, we want null-map to be a bitset.
   

##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       > we don't need columnIndex (if you mean that column index here is position in columns list).
   
   I mean column index in Columns class. E,g, see SchemaDescriptor.keyCols.
   We need a specific order for serialization: key columns prior to value columns and fixlen types prior to varlen types.
   Also, we want null-map to be a bitset.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709401399



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed event
+     * from within {@link TableManager#createTableAsync(String, Consumer, boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
-            return onConfigurationChanged(ctx.storageRevision(), ctx.oldValue(), ctx.newValue());
+        clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != null :
+                    "Table =[" + ctx.newValue().name() + "] has empty assignments.";
+
+                final IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());

Review comment:
       That's correct, but ctx.newValue() might be null only in case of drop event Wihtin onCreate it'll always be not null. It's actually a matter or common ConfigurationNotificationEvent interface: in case of create oldValue is null, in case of drop - new one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] zstan commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707944133



##########
File path: modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests scenarios for an affinity service. Please pay attention that given test doesn't check Rendezvous or any other
+ * affinity function it just checks {@link AffinityService} logic.
+ */
+public class AffinityServiceTest {
+    /**
+     *
+     */
+    @Test
+    public void testCalculatedAssignmentHappyPath() {
+        List<List<ClusterNode>> assignments = AffinityService.calculateAssignments(
+            Arrays.asList(
+                new ClusterNode(
+                    UUID.randomUUID().toString(), "node0",
+                    new NetworkAddress("localhost", 8080)
+                ),
+                new ClusterNode(
+                    UUID.randomUUID().toString(), "node1",
+                    new NetworkAddress("localhost", 8081)
+                )
+

Review comment:
       empty line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707422172



##########
File path: modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityService.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Stateless affinity service that produces helper methods for an affinity assignments calculation.
+ */
+public class AffinityService {
+    /**
+     * Calculates affinity assignments.
+     *
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
+     * @return List nodes by partition.
+     */
+    public static List<List<ClusterNode>> calculateAssignments(

Review comment:
       It calculates an assignment.
   What's the point of making it non-static?
   The same is about SchemaService (ex SchemaManager).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707484273



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapper.java
##########
@@ -24,6 +24,7 @@
 /**
  * Column mapper interface.
  */
+// TODO: IGNITE-15480 Probably should be removed after index column mapping refactoring.

Review comment:
       Internal column sort order differs from the user provided in config.
   Now, column index is an index in array of columns.
   
   So, we need mapping name->columnID->columnIndex. There is a ticket for this [1].
   
   [1] https://issues.apache.org/jira/browse/IGNITE-13667




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r708316009



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/configuration/schema/ExtendedTableConfigurationSchema.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.schema;
+
+import org.apache.ignite.configuration.annotation.InternalConfiguration;
+import org.apache.ignite.configuration.annotation.NamedConfigValue;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.schemas.table.TableConfigurationSchema;
+import org.apache.ignite.configuration.validation.Immutable;
+
+/**
+ * Extended table configuration schema class.
+ */
+@InternalConfiguration
+public class ExtendedTableConfigurationSchema extends TableConfigurationSchema {
+    /** Table id. String representation of {@link org.apache.ignite.lang.IgniteUuid}. */
+    @Value
+    @Immutable
+    public String id;

Review comment:
       Why cannot we use IgniteUuid as a type of configuration?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #330: IGNITE-15404 Rework disctributed configuration flow.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709414143



##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -325,6 +326,33 @@ public ClientMessagePacker packUuid(UUID val) {
         return this;
     }
 
+    /**
+     * Writes an {@link IgniteUuid}.
+     *
+     * @param val {@link IgniteUuid} value.
+     * @return This instance.
+     */
+    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {
+        assert !closed : "Packer is closed";
+
+        packExtensionTypeHeader(ClientMsgPackType.IGNITE_UUID, 24);
+
+        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
+        var bytes = new byte[24];
+        ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+        UUID globalId = val.globalId();
+
+        bb.putLong(globalId.getMostSignificantBits());
+        bb.putLong(globalId.getLeastSignificantBits());
+
+        bb.putLong(val.localId());
+
+        writePayload(bytes);
+
+        return this;

Review comment:
       Well, I still think it should be a part of some other ticket, cause we have same similar logic for packing other data types. If it should be refactored it should be refactored for all cases, WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org